sys_mqtt.c 9.9 KB


  1. #include "string.h"
  2. #include "stm32f2xx.h"
  3. #include "ucos_ii.h"
  4. #include "hd_eth.h"
  5. #include "lwip/arch.h"
  6. #include "sys_mqtt.h"
  7. #include "transport.h"
  8. #include "MQTTFormat.h"
  9. #include "cJSON.h"
  10. #include "led.h"
  11. #include "MQTTClient.h"
  12. #include "malloc.h"
  13. #include "mmodbus.h"
  14. #include "myFile.h"
  15. #include "gateway_message.h"
  16. #include "task.h"
  17. /********************************************************
  18. * @brief 连接到服务器
  19. * @param sock: sock编号
  20. *********************************************************/
  21. int mqtt_userConnect(void)
  22. {
  23. GATEWAY_PARAMS* get;
  24. get = get_gateway_config_params();
  25. char* MQTT_SERVER_ADDR = (char*)get->host;
  26. int MQTT_SERVER_PORT = get->port;
  27. int sock = -1;
  28. //连接到tcp服务器
  29. sock = transport_open(MQTT_SERVER_ADDR, MQTT_SERVER_PORT);
  30. if(sock < 0)
  31. {
  32. MQTT_PRINTF("connect tcp server error \r\n");
  33. return -1;
  34. }
  35. MQTT_PRINTF("connect tcp server success \r\n");
  36. //连接到mqtt服务器
  37. if(mqtt_connectToMqttServer(sock) <= 0)
  38. {
  39. MQTT_PRINTF("connect mqtt server error \r\n");
  40. return -1;
  41. }
  42. MQTT_PRINTF("connect mqtt server success \r\n");
  43. return sock;
  44. }
  45. /************************************************************
  46. * @brief 订阅主题
  47. * @param sock: sock编号
  48. * @retval 1: 订阅成功
  49. *************************************************************/
  50. int mqtt_userSubscribeTopic(int sock)
  51. {
  52. GATEWAY_PARAMS* get;
  53. get = get_gateway_config_params();
  54. char* TOPICPC = (char*)get->commandTopic;
  55. return mqtt_subscribeTopic(sock, TOPICPC, 2);
  56. }
  57. /*
  58. *接收mqtt下发消息并存储进一个数据队列
  59. *name MQTT主题相关信息
  60. */
  61. void *json_message[10];
  62. #define QUEUE_SIZE 10 //队列深度
  63. //消息队列指针
  64. OS_EVENT *JsonQ;
  65. void mqtt_outputMsg(MQTTString *name, uint8_t *msgbuf, int msglen, uint16_t id, int qos)
  66. {
  67. int lenght=msglen;
  68. MQTT_PRINTF("receive a msg: id=%d qos=%d topic=%s msg=%s \r\n", id, qos, name->lenstring.data, msgbuf);
  69. StringInfo *message=mymalloc(SRAMIN ,sizeof(StringInfo));
  70. message->stringLength=msglen;
  71. message->p=mymalloc(SRAMIN ,msglen+1);
  72. memcpy(message->p,msgbuf,msglen);
  73. uint8_t err;
  74. OSTimeDly(1500);
  75. err=OSQPost(JsonQ,message);
  76. commd = 0;
  77. switch(err)
  78. {
  79. case OS_ERR_NONE:
  80. break;
  81. case OS_ERR_Q_FULL:
  82. MQTT_PRINTF("receive a msg queue is full \r\n");
  83. break;
  84. default:
  85. break;
  86. }
  87. }
  88. /************************************************************
  89. * @brief 处理来自平台发布的信息
  90. * @param sock: sock编号
  91. pbuf: 接收的数据
  92. buflen: 数据的长度
  93. * @retval 1: 消息正常处理
  94. * -2: 帧数据包错误
  95. *************************************************************/
  96. int mqtt_recvPublishMessage(int sock, uint8_t *pbuf, int buflen)
  97. {
  98. uint8_t dup, retained;
  99. int qos, payloadlen;
  100. uint16_t packetid;
  101. MQTTString topicName;
  102. uint8_t *payload;
  103. int msg;
  104. if(MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, &payload, &payloadlen, pbuf, buflen) == 1)
  105. {
  106. mqtt_outputMsg(&topicName, payload, payloadlen, packetid, qos);
  107. }
  108. else return -2;
  109. if(qos == 1)
  110. {
  111. mqtt_recvPublishQos1_packid = packetid;
  112. msg = MBOX_MQTT_QOS1PUBACK;
  113. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  114. }
  115. if(qos == 2)
  116. {
  117. mqtt_recvPublishQos2_packid = packetid;
  118. msg = MBOX_MQTT_QOS2PUBREC;
  119. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  120. }
  121. return 1;
  122. }
  123. /************************************************************
  124. * @brief 解析所有平台下发的数据包
  125. * @param sock: sock编号
  126. type: 数据帧的类型
  127. pbuf: 数据
  128. buflen: 数据的长度
  129. * @retval 1: 消息正常处理
  130. * -2: 帧数据包错误
  131. *************************************************************/
  132. int mqtt_userReceiveMessage(int sock, int type, uint8_t *pbuf, int len)
  133. {
  134. int rc;
  135. switch(type)
  136. {
  137. case PUBLISH: rc = mqtt_recvPublishMessage(sock, pbuf, len); break; //平台向设备发布消息
  138. case PUBACK: rc = mqtt_publishMessage_qos1_PUBACK(pbuf, len); break; //设备向平台发布 QOS1 确认报文
  139. case PUBREC: rc = mqtt_publishMessage_qos2_PUBREC(pbuf, len); break; //设备向平台发布 QOS2 确认报文
  140. case PUBREL: rc = mqtt_recvPublishMessage_qos2_PUBREL(sock, pbuf, len); break; //平台向设备发布 QOS2 消息释放报文
  141. case PUBCOMP: rc = mqtt_publishMessage_qos2_PUBCOMP(pbuf, len); break; //设备向平台发布 QOS2 消息释放报文的确认报文
  142. case SUBACK: rc = mqtt_subscribeTopic_SUBACK(pbuf, len);break; //设备向平台订阅报文时的响应报文
  143. case UNSUBACK: rc = mqtt_unSubscribeTopic_UNSUBACK(pbuf, len); break; //设备向平台发取消订阅报文时的响应报文
  144. case PINGRESP: rc = mqtt_pingResponse(); break; //心跳响应报文
  145. default: rc = 1; break;
  146. }
  147. return rc;
  148. }
  149. /************************************************************
  150. * @brief 处理发送邮箱 mqtt_recvMbox 中的消息,根据消息的类型,发送对应的数据包
  151. * @param sock: sock编号
  152. boxMsg: 消息内容
  153. * @retval 1: 消息正常处理
  154. * -1: 网络错误
  155. * -2: 组建发送数据时发送错误
  156. *************************************************************/
  157. char pubJsonString[jsonMaxSize];
  158. int mqtt_userSendMessage(int sock, int boxMsg)
  159. {
  160. GATEWAY_PARAMS* get;
  161. get = get_gateway_config_params();
  162. char* TOPIC = (char*)get->messageTopic;
  163. int rc = 1;
  164. //用户发送的数据
  165. if((boxMsg & 0xf0000000) == 0x20000000)
  166. {
  167. switch(boxMsg)
  168. {
  169. case MBOX_USER_PUBLISHQOS0: rc = mqtt_publishMessage_qos0(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
  170. case MBOX_USER_PUBLISHQOS1: rc = mqtt_publishMessage_qos1(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
  171. case MBOX_USER_PUBLISHQOS2: rc = mqtt_publishMessage_qos2(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
  172. }
  173. }
  174. //协议站使用的数据
  175. else if((boxMsg & 0xf0000000) == 0x10000000)
  176. {
  177. switch(boxMsg)
  178. {
  179. case MBOX_MQTT_QOS1PUBACK: rc = mqtt_recvPublishMessage_qos1_PUBACK(sock, mqtt_recvPublishQos1_packid); break;
  180. case MBOX_MQTT_QOS2PUBREC: rc = mqtt_recvPublishMessage_qos2_PUBREC(sock, mqtt_recvPublishQos2_packid); break;
  181. case MBOX_MQTT_QOS2PUBCOMP: rc = mqtt_recvPublishMessage_qos2_PUBCOMP(sock, mqtt_recvPublishQos2_packid); break;
  182. }
  183. }
  184. return rc;
  185. }
  186. OS_EVENT *mqtt_sendMseeageMbox; //发送消息邮箱
  187. int mqtt_connectFlag; //成功连接服务器标志
  188. static int mysock; //连接mqtt服务器的sock编号
  189. /************************************************************
  190. * @brief MQTT主线程
  191. * @param arg: 未使用
  192. * @retval
  193. *************************************************************/
  194. void mqtt_userManThread(void *arg)
  195. {
  196. int rc;
  197. uint8_t err;
  198. void *mboxMsg;
  199. MQTT_PRINTF("mqtt mainthread start \r\n");
  200. __MQTT_START:
  201. //1.建立与服务器的连接
  202. mysock = -1;
  203. mqtt_connectFlag = 0;
  204. while(mysock < 0)
  205. {
  206. mysock = mqtt_userConnect();
  207. OSTimeDly(2000);
  208. }
  209. OSMboxAccept(mqtt_sendMseeageMbox); //清空mbox的数据
  210. mqtt_connectFlag = 1;
  211. //2.订阅服务器的主题
  212. rc = mqtt_userSubscribeTopic(mysock);
  213. if(rc <= 0)
  214. {
  215. MQTT_PRINTF("subscribe error \r\n");
  216. if(rc == -1) goto __MQTT_START; //如果网络发生错误,重新建立连接
  217. }
  218. else MQTT_PRINTF("subscribe success \r\n");
  219. //3.循环发送数据
  220. while(1)
  221. {
  222. mboxMsg = OSMboxPend(mqtt_sendMseeageMbox, 60000, &err);
  223. if(OS_ERR_NONE == err)
  224. {
  225. //接收到了 网络异常消息
  226. if((*(unsigned int *)mboxMsg) == MBOX_NETWORK_ERROR) goto __MQTT_START;
  227. else
  228. {
  229. if(mqtt_userSendMessage(mysock, *(unsigned int *)mboxMsg) == -1)
  230. goto __MQTT_START;
  231. memset(pubJsonString,0,strlen(pubJsonString));
  232. }
  233. }
  234. else //超时没有发送数据包,发送心跳包
  235. {
  236. rc = mqtt_pingReq(mysock);
  237. if(rc == -1) goto __MQTT_START;
  238. }
  239. OSTimeDly(1);
  240. }
  241. }
  242. /************************************************************
  243. * @brief MQTT接收线程,处理所有平台下发的数据帧, CONNACK除外
  244. * @param arg: 未使用
  245. * @retval
  246. *************************************************************/
  247. void mqtt_userReceiveThread(void *arg)
  248. {
  249. int len;
  250. int packetType;
  251. int msg;
  252. MQTT_PRINTF("mqtt receivethread start \r\n");
  253. while(1)
  254. {
  255. if(mqtt_connectFlag == 1)
  256. {
  257. len = transport_receive(mysock); //阻塞接收数据
  258. if(len <=0)
  259. {
  260. if(len == EWOULDBLOCK) //接收数据超时,重新接收
  261. {
  262. MQTT_PRINTF("receive data timeout \r\n");
  263. continue;
  264. }
  265. else //接收数据时,网络发生了异常,给主线程发送消息,重新建立连接
  266. {
  267. MQTT_PRINTF("sock close \r\n");
  268. transport_close(mysock);
  269. mqtt_connectFlag = 0;
  270. msg = MBOX_NETWORK_ERROR;
  271. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  272. }
  273. }
  274. memset(mqtt_recvbuffer, 0, MQTT_RECVBUF_LENTH);
  275. packetType = MQTTPacket_read(mqtt_recvbuffer, len, transport_getdata);
  276. mqtt_userReceiveMessage(mysock, packetType, mqtt_recvbuffer, len);
  277. }
  278. OSTimeDly(1);
  279. }
  280. }
  281. #define APP_TASK_MQTTMAIN_PRIO 6
  282. #define APP_TASK_MQTTMAIN_STK_SIZE 1024
  283. static OS_STK mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE];
  284. #define APP_TASK_MQTTRECEIVE_PRIO 7
  285. #define APP_TASK_MQTTRECEIVE_STK_SIZE 1024
  286. static OS_STK mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE];
  287. /************************************************************
  288. * @brief 创建MQTT发送和接收线程,初始化相关数据
  289. * @param 此函数必须在LWIP协议栈初始完成以后再调用
  290. * @retval
  291. *************************************************************/
  292. void mqtt_threadCreate(void)
  293. {
  294. mqtt_connectFlag = 0;
  295. mqtt_sendMseeageMbox = OSMboxCreate(NULL);
  296. JsonQ = OSQCreate(json_message[0], QUEUE_SIZE); //创建json队列
  297. OSTaskCreate(mqtt_userManThread, NULL, &mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE-1], APP_TASK_MQTTMAIN_PRIO);
  298. OSTaskCreate(mqtt_userReceiveThread, NULL, &mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE-1], APP_TASK_MQTTRECEIVE_PRIO);
  299. }