data_task.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. #include "data_task.h"
  2. #include "usart.h"
  3. #include "sys_mqtt.h"
  4. #include "sys_http.h"
  5. #include "mmodbus.h"
  6. #include "gateway_message.h"
  7. #include "dlt645_port.h"
  8. #include "myFile.h"
  9. #include "timer.h"
  10. #include "led.h"
  11. #include "tcp_server.h"
  12. #include "log.h"
  13. #include "app_ethernet.h"
  14. void protocolsModeFunc(GATEWAY_PARAMS* current_device, char* string);
  15. void transparentModeFunc(GATEWAY_PARAMS* current_device, char* string);
  16. //recv_state:读写标志位 0:失败 1:成功
  17. //mode:读取数据的方式 0:全部数据 1:不同数据
  18. //def:检测mode = 1,string是否含有数据
  19. //startFlag:读数据启动位
  20. uint8_t recv_state = 0, mode = 0, def = 0, startFlag = 0;
  21. int time1,time2,transparent_lenth;
  22. void data_task(void const * argument)
  23. {
  24. dlt645_init(10); // 若读不到数据,则延时 参数 秒
  25. mmodbus_init(10);// 若读不到数据,则延时 参数 秒
  26. GATEWAY_PARAMS *get;
  27. char *device_config_json = mymalloc(SRAMEX, 20 * 1024);
  28. if(device_config_json == NULL)
  29. LogPrint(LOG_ERROR,__FILE__,__FUNCTION__,__LINE__,"device_config_json malloc fail");
  30. memset(device_config_json,0,strlen(device_config_json));
  31. read_file("device.txt", device_config_json);
  32. addGatewayParams(device_config_json);
  33. myfree(SRAMEX,device_config_json);
  34. get= get_gateway_config_params();
  35. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"device params not empty");
  36. DEVICE_PARAMS *current_device=get->device_params;
  37. char *string = mymalloc(SRAMEX, 5 * 1024); // 接收读取数据
  38. if(string == NULL)
  39. LogPrint(LOG_ERROR,__FILE__,__FUNCTION__,__LINE__,"string malloc fail");
  40. memset(string,0,strlen(string));
  41. while (current_device!=NULL)
  42. {
  43. if(ProtocolsModeFlag)
  44. {
  45. protocolsModeFunc(get, string);
  46. }
  47. else if(TransparentModeFlag)
  48. {
  49. transparentModeFunc(get, string);
  50. }
  51. current_device=get->device_params;
  52. HAL_GPIO_WritePin(GPIOF,LED_PIN,GPIO_PIN_RESET);
  53. vTaskDelay(500);
  54. }
  55. myfree(SRAMEX, string);
  56. LogPrint(LOG_ERROR,__FILE__,__FUNCTION__,__LINE__,"data_task return");\
  57. while(1){};
  58. }
  59. /*
  60. *********************************************************************************************************
  61. * 函 数 名: int compareArrays(uint8_t arr1[], uint8_t arr2[], int size)
  62. * 功能说明: 比较两个数组是否相同
  63. * 形 参: arr1[] 数组1,arr2[] 数组2,size 比较数组的大小
  64. * 返 回 值: 1: 相同 0:不相同
  65. *********************************************************************************************************
  66. */
  67. int compareArrays(uint8_t arr1[], uint8_t arr2[], int size) {
  68. for (int i = 0; i < size; ++i) {
  69. if (arr1[i] != arr2[i]) {
  70. return 1; // 两个数组不相同,返回0
  71. }
  72. }
  73. return 0; // 两个数组相同,返回1
  74. }
  75. /*
  76. *********************************************************************************************************
  77. * 函 数 名: int READ_MODBUS_DATA(DEVICE_PARAMS *device)
  78. * 功能说明: 读取当前节点上的modbus数据
  79. * 形 参: DEVICE_PARAMS *device 当前设备
  80. * 返 回 值: 1: 成功 0:失败
  81. *********************************************************************************************************
  82. */
  83. int read_device_data(DEVICE_PARAMS *device, char* string)
  84. {
  85. DEVICE_PARAMS *current_device=device;
  86. GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command;
  87. GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  88. while(current_device->params != NULL)
  89. {
  90. if (current_device->protocol == MODBUS)
  91. {
  92. uint16_t data[currentModbusParams->registerByteNum /2];
  93. mmodbus_set16bitOrder(current_device->MDBbigLittleFormat);
  94. // 读单个寄存器
  95. if (currentModbusParams->functionCode == 0x03)
  96. {
  97. // bool success = mmodbus_readHoldingRegisters16i(0x17,0x00,0x02,data);
  98. bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress,
  99. currentModbusParams->registerAddress,
  100. currentModbusParams->registerByteNum /2,
  101. data);
  102. if (success)
  103. {
  104. uint32_t value;
  105. if (currentModbusParams->registerByteNum == 4)
  106. {
  107. value = (uint32_t)data[0] | data[1];
  108. }
  109. else if (currentModbusParams->registerByteNum == 2)
  110. {
  111. value = data[0];
  112. }
  113. if(mode == 0)// all
  114. {
  115. sprintf(string + strlen(string), "{\"deviceId\":\"%s\",\"%s\":%d},",
  116. current_device->deviceID, currentModbusParams->keyword, value);
  117. }
  118. else if(mode == 1)// def
  119. {
  120. if((value - currentModbusParams->value) != 0)
  121. {
  122. sprintf(string + strlen(string), "{\"deviceId\":\"%s\",\"%s\":%d},",
  123. current_device->deviceID, currentModbusParams->keyword, value);
  124. def = 1;
  125. }
  126. }
  127. if (currentModbusParams->decimalPoint == 0)
  128. {
  129. currentModbusParams->value = value;
  130. }
  131. else
  132. {
  133. currentModbusParams->value = value / my_pow(10,currentModbusParams->decimalPoint);
  134. }
  135. }
  136. currentModbusParams = currentModbusParams->nextParams;
  137. if (currentModbusParams == NULL)
  138. {
  139. current_device = current_device->nextDevice;
  140. currentModbusParams = current_device->params->gateway_read_modbus_command;
  141. if(current_device == NULL)
  142. {
  143. sprintf(string + strlen(string) - 1, "");
  144. return 1;
  145. }
  146. }
  147. }
  148. }
  149. else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97)
  150. {
  151. uint8_t read_buf[10];
  152. uint32_t dltValue;
  153. currentDLT645Params->rxLen = 0;
  154. memset(read_buf, 0, 10);
  155. memset(currentDLT645Params->data, 0, 10);
  156. dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645);
  157. int8_t rs;
  158. if (current_device->protocol == DLT645_2007)
  159. {
  160. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007);
  161. }
  162. else if (current_device->protocol == DLT645_1997)
  163. {
  164. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997);
  165. }
  166. if (rs != -1)
  167. {
  168. if(mode == 0)// all
  169. {
  170. if (rs <= 4)
  171. {
  172. memcpy(currentDLT645Params->data, read_buf, 4);
  173. currentDLT645Params->rxLen = 4;
  174. }
  175. else if (rs == 5)
  176. {
  177. memcpy(currentDLT645Params->data, read_buf, 5);
  178. currentDLT645Params->rxLen = 5;
  179. }
  180. else if (rs > 5)
  181. {
  182. memcpy(currentDLT645Params->data, read_buf, 9);
  183. currentDLT645Params->rxLen = 9;
  184. }
  185. dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16|
  186. currentDLT645Params->data[2] << 8 | currentDLT645Params->data[3];
  187. sprintf(string + strlen(string), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}",
  188. currentDLT645Params->keyword, currentDLT645Params->deviceID645[0],
  189. currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2],
  190. currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4],
  191. currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue);
  192. }
  193. else if(mode == 1)//def
  194. {
  195. if(compareArrays(read_buf,currentDLT645Params->data,10))// 不相同1,相同0
  196. {
  197. if (rs <= 4)
  198. {
  199. memcpy(currentDLT645Params->data, read_buf, 4);
  200. currentDLT645Params->rxLen = 4;
  201. }
  202. else if (rs == 5)
  203. {
  204. memcpy(currentDLT645Params->data, read_buf, 5);
  205. currentDLT645Params->rxLen = 5;
  206. }
  207. else if (rs > 5)
  208. {
  209. memcpy(currentDLT645Params->data, read_buf, 9);
  210. currentDLT645Params->rxLen = 9;
  211. }
  212. dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16|
  213. currentDLT645Params->data[2] << 8 | currentDLT645Params->data[3];
  214. sprintf(string + strlen(string), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}",
  215. currentDLT645Params->keyword, currentDLT645Params->deviceID645[0],
  216. currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2],
  217. currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4],
  218. currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue);
  219. def = 1;
  220. }
  221. }
  222. }
  223. currentDLT645Params = currentDLT645Params->nextParams;
  224. if (currentDLT645Params == NULL)
  225. {
  226. current_device = current_device->nextDevice;
  227. currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  228. if(current_device == NULL)
  229. {
  230. sprintf(string + strlen(string) - 1, "");
  231. return 1;
  232. }
  233. }
  234. }
  235. }
  236. return 1;
  237. }
  238. /*
  239. *********************************************************************************************************
  240. * 函 数 名:void WRITE_MODBUS_DATA(char* cJSONstring)
  241. * 功能说明: 接收mqtt数据并写入modbus寄存器
  242. * 形 参:char* cJSONstring mqtt接收到的数据
  243. * 返 回 值: 无
  244. *********************************************************************************************************
  245. */
  246. void write_modbus_data(char* JSON_STRING)
  247. {
  248. JSON_CMD jsonMsg;
  249. GATEWAY_PARAMS* get;
  250. get = get_gateway_config_params();
  251. DEVICE_PARAMS* current_device = get->device_params;
  252. jsonMsg.parameter =parseIntField(JSON_STRING, "\"parameter\":");
  253. parseStringField(JSON_STRING, "\"deviceId\":\"", (char*)&jsonMsg.deviceId);
  254. parseStringField(JSON_STRING, "\"identifier\":\"", (char*)&jsonMsg.identifier);
  255. parseStringField(JSON_STRING, "\"messageId\":\"", (char*)&jsonMsg.messageId);
  256. parseStringField(JSON_STRING, "\"action\":\"", (char*)&jsonMsg.action);
  257. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"write to mqtt");
  258. while(current_device)
  259. {
  260. char* device_ID = (char*)current_device->deviceID;
  261. GATEWAY_WRITE_MODBUS_COMMAND *currentModbusWriteParams = current_device->params->gateway_write_modbus_command;
  262. GATEWAY_READ_MODBUS_COMMAND *currentModbusReadParams = current_device->params->gateway_read_modbus_command;
  263. char* pubJsonString = mymalloc(SRAMEX,150);
  264. switch(atoi((char*)&jsonMsg.action))
  265. {
  266. case 0:/* write */
  267. if(!strcmp(device_ID,(char*)&jsonMsg.deviceId))
  268. {
  269. while(currentModbusWriteParams != NULL)
  270. {
  271. if(!strcmp((char*)&currentModbusWriteParams->keyword,(char*)&jsonMsg.identifier)) //匹配ID和属性
  272. {
  273. recv_state = 0;
  274. delay_ms(100);
  275. while(mmodbus.done != 1) delay_ms(100); // 等待modbus读取完成,防止粘包
  276. mmodbus_writeHoldingRegister16i(currentModbusWriteParams->slaveAddress, currentModbusWriteParams->registerAddress, jsonMsg.parameter);
  277. sprintf(pubJsonString,"{\"action\":\"%s\",\"identifier\":\"%s\",\"deviceId\":\"%s\",\"messageId\":\"%s\",\"state\":%d}",
  278. jsonMsg.action,jsonMsg.identifier,jsonMsg.deviceId,jsonMsg.messageId,recv_state); // 组成要发送的json语句
  279. mqtt_publish_data(pubJsonString, QOS0, strlen(pubJsonString), (char*)&get->messageTopic);
  280. delay_ms(100);
  281. }
  282. currentModbusWriteParams = currentModbusWriteParams->nextParams;
  283. }
  284. }
  285. break;
  286. case 1:/* read */
  287. if(!strcmp(device_ID,(char*)&jsonMsg.deviceId))
  288. {
  289. while(currentModbusReadParams != NULL)
  290. {
  291. if(!strcmp((char*)&currentModbusReadParams->keyword,(char*)&jsonMsg.identifier)) //匹配ID和属性
  292. {
  293. delay_ms(100);
  294. recv_state = 0;
  295. uint16_t data[currentModbusReadParams->registerByteNum /2]; // modbus寄存器长度
  296. while(mmodbus.done != 1) delay_ms(100); // 等待modbus读取完成,防止粘包
  297. bool success = mmodbus_readHoldingRegisters16i(currentModbusReadParams->slaveAddress,currentModbusReadParams->registerAddress,
  298. currentModbusReadParams->registerByteNum /2,data);
  299. if (success)
  300. {
  301. recv_state = 1;
  302. uint32_t value;
  303. if (currentModbusReadParams->registerByteNum == 4)
  304. {
  305. value = (uint32_t)data[0] | data[1];
  306. }
  307. else if (currentModbusReadParams->registerByteNum == 2)
  308. {
  309. value = data[0];
  310. }
  311. sprintf(pubJsonString,"{\"action\":\"%s\",\"identifier\":\"%s\",\"deviceId\":\"%s\",\"messageId\":\"%s\",\"state\":%d,\"parameter\":%d}",
  312. jsonMsg.action,jsonMsg.identifier,jsonMsg.deviceId,jsonMsg.messageId,recv_state,value); // 组成要发送的json语句
  313. mqtt_publish_data(pubJsonString, QOS0, strlen(pubJsonString), (char*)&get->messageTopic);
  314. delay_ms(100);
  315. }
  316. }
  317. currentModbusReadParams = currentModbusReadParams->nextParams;
  318. }
  319. }
  320. break;
  321. case 3:/* reboot */
  322. __set_PRIMASK(1);
  323. NVIC_SystemReset();
  324. break;
  325. }
  326. current_device = current_device->nextDevice;
  327. myfree(SRAMEX, pubJsonString);
  328. }
  329. }
  330. // 重定义pow函数
  331. uint32_t my_pow(int base, int exponent) {
  332. uint32_t result = 1;
  333. for(int i = 0; i < exponent; i++) {
  334. result *= base;
  335. }
  336. return result;
  337. }
  338. void protocolsModeFunc(GATEWAY_PARAMS* get, char* string)
  339. {
  340. // if(mqtt_connectFlag)
  341. // {
  342. HAL_GPIO_WritePin(GPIOF,LED_PIN,GPIO_PIN_SET);
  343. time1 = GetCurrentTime();
  344. sprintf(string,"{\"deviceId\":\"%s\",\"data\":[",get->deviceId); // 组成要发送的json语句
  345. if(startFlag && time2 <= time1 - ( 10 * 1000))// 10s进行一次全数据发送
  346. {
  347. mode = 0;//all
  348. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"protocolsMode:All data");
  349. read_device_data(get->device_params,string);
  350. sprintf(string + strlen(string),"]}");
  351. mqtt_publish_data(string, QOS0, strlen(string), (char*)&get->messageTopic);
  352. time2 = GetCurrentTime();
  353. }
  354. else
  355. {
  356. mode = 1;// def
  357. read_device_data(get->device_params, string);
  358. if(def)// 检测string是否含有数据
  359. {
  360. def = 0;
  361. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"protocolsMode:Different data");
  362. sprintf(string + strlen(string),"]}");
  363. mqtt_publish_data(string, QOS0, strlen(string), (char*)&get->messageTopic);
  364. time2 = GetCurrentTime();
  365. }
  366. }
  367. startFlag = 1;
  368. memset(string,0,strlen(string));
  369. // }
  370. }
  371. int transparent_data(DEVICE_PARAMS *device, char* string)
  372. {
  373. DEVICE_PARAMS *current_device=device;
  374. GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command;
  375. GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  376. while(current_device->params != NULL)
  377. {
  378. HAL_GPIO_WritePin(GPIOF,LED_PIN,GPIO_PIN_SET);
  379. if (current_device->protocol == MODBUS)
  380. {
  381. uint16_t data[currentModbusParams->registerByteNum /2]; // modbus寄存器长度
  382. mmodbus_set16bitOrder(current_device->MDBbigLittleFormat);
  383. // 读单个寄存器
  384. if (currentModbusParams->functionCode == 0x03)
  385. {
  386. // bool success = mmodbus_readHoldingRegisters16i(0x17,0x00,0x02,data);
  387. bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress,
  388. currentModbusParams->registerAddress,
  389. currentModbusParams->registerByteNum /2,
  390. data);
  391. if (success)
  392. {
  393. memcpy(string + transparent_lenth, mmodbus.rxBuf, mmodbus.rxIndex);
  394. transparent_lenth += mmodbus.rxIndex;
  395. }
  396. currentModbusParams = currentModbusParams->nextParams;
  397. if (currentModbusParams == NULL)
  398. {
  399. current_device = current_device->nextDevice;
  400. currentModbusParams = current_device->params->gateway_read_modbus_command;
  401. if(current_device == NULL)
  402. {
  403. return 1;
  404. }
  405. }
  406. }
  407. }
  408. else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97)
  409. {
  410. uint8_t read_buf[10];
  411. memset(read_buf, 0, 10);
  412. memset(currentDLT645Params->data, 0, 10);
  413. dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645);
  414. int8_t rs;
  415. if (current_device->protocol == DLT645_2007)
  416. {
  417. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007);
  418. }
  419. else if (current_device->protocol == DLT645_1997)
  420. {
  421. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997);
  422. }
  423. if (rs != -1)
  424. {
  425. sprintf(string, "%s", read_buf);
  426. }
  427. currentDLT645Params = currentDLT645Params->nextParams;
  428. if (currentDLT645Params == NULL)
  429. {
  430. current_device = current_device->nextDevice;
  431. currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  432. if(current_device == NULL)
  433. {
  434. return 1;
  435. }
  436. }
  437. }
  438. HAL_GPIO_WritePin(GPIOF,LED_PIN,GPIO_PIN_RESET);
  439. }
  440. return 1;
  441. }
  442. void transparentModeFunc(GATEWAY_PARAMS* get, char* string)
  443. {
  444. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"transparentMode:All data");
  445. printf("transparentMode:All data\n");
  446. memset(string,0,transparent_lenth);
  447. transparent_lenth = 0; // 记录string接收大小
  448. transparent_data(get->device_params, string);
  449. mqtt_publish_data(string, QOS0, transparent_lenth, (char*)&get->messageTopic);
  450. }