sys_mqtt.c 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. /*
  2. * @Author: jiejie
  3. * @Github: https://github.com/jiejieTop
  4. * @Date: 2019-12-11 21:53:07
  5. * @LastEditTime : 2022-06-15 23:03:30
  6. * @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
  7. */
  8. #include <stdio.h>
  9. #include <stdlib.h>
  10. #include "mqtt_config.h"
  11. #include "mqtt_log.h"
  12. #include "sys_mqtt.h"
  13. void mqtt_publish_task(void *arg);
  14. static void topic1_handler(void* client, message_data_t* msg)
  15. {
  16. (void) client;
  17. MQTT_LOG_I("-----------------------------------------------------------------------------------");
  18. MQTT_LOG_I("%s:%d %s()...\ntopic: %s\nmessage:%s", __FILE__, __LINE__, __FUNCTION__, msg->topic_name, (char*)msg->message->payload);
  19. MQTT_LOG_I("-----------------------------------------------------------------------------------");
  20. }
  21. /*
  22. *初始化mqtt连接
  23. */
  24. int8_t mqtt_init(mqtt_client_t *client,char *clientId,char *user_name,char *password,char *ip,char *port,uint16_t keepAlive)
  25. {
  26. mqtt_set_client_id(client,clientId);
  27. mqtt_set_port(client, port);
  28. mqtt_set_host(client, ip);
  29. mqtt_set_user_name(client, user_name);
  30. mqtt_set_password(client, password);
  31. mqtt_set_clean_session(client, 1);
  32. mqtt_set_keep_alive_interval(client,keepAlive);
  33. return mqtt_connect(client);
  34. }
  35. //发送数据队列句柄
  36. QueueHandle_t xQueue1;
  37. void mqtt_task_creat()
  38. {
  39. mqtt_client_t *client = NULL;//创建一个客户端
  40. mqtt_log_init();
  41. client = mqtt_lease();
  42. printf("\nwelcome to mqttclient test...\n");
  43. xQueue1=xQueueCreate(10,sizeof( struct Pub_Queue * )); //创建一个mqtt上传的队列
  44. xTaskCreate(mqtt_publish_task, "mqtt_publish_task",1024, client, 2, NULL);
  45. }
  46. void mqtt_publish_task(void *arg)
  47. {
  48. mqtt_client_t *client = (mqtt_client_t *)arg;
  49. while(1)
  50. {
  51. int rc=mqtt_init(client,"client_id_001",NULL,NULL,"36.134.23.11","1883",1000);
  52. vTaskDelay(100);
  53. }
  54. mqtt_subscribe(client,"sub_topic_task", QOS0, topic1_handler);
  55. mqtt_message_t msg;
  56. memset(&msg, 0, sizeof(msg));
  57. mqtt_list_subscribe_topic(client);
  58. struct Pub_Queue *pxMessage;
  59. while(1) {
  60. if(xQueue1 !=0)
  61. {
  62. if(xQueueReceive(xQueue1,&(pxMessage),( TickType_t )10))
  63. {
  64. //处理队列内部数据并上传
  65. msg.payloadlen=pxMessage->pubLength;
  66. msg.qos=pxMessage->qos;
  67. msg.payload=(void *)pxMessage->message;
  68. mqtt_publish(client,pxMessage->pub_topic,&msg);
  69. vPortFree(pxMessage->message);
  70. vPortFree(pxMessage->pub_topic);
  71. vPortFree(pxMessage);
  72. }
  73. }
  74. vTaskDelay(100);
  75. }
  76. }
  77. void mqtt_publish_data(uint8_t *payload,mqtt_qos_t qos,uint16_t pub_length,char *topic)
  78. {
  79. struct Pub_Queue *pxMessage=pvPortMalloc(sizeof(struct Pub_Queue));
  80. pxMessage->message=pvPortMalloc(pub_length+1);
  81. memset(pxMessage->message,0,(pub_length+1));
  82. memcpy(pxMessage->message,payload,pub_length);
  83. pxMessage->pub_topic=pvPortMalloc(strlen(topic)+1);
  84. memset(pxMessage->pub_topic,0,(strlen(topic)+1));
  85. strcpy(pxMessage->pub_topic,topic);
  86. xQueueSend( xQueue1, (void *)&pxMessage, ( TickType_t ) 0 );
  87. }