#include "LTE_MQTT.h" #include "lte.h" #include "stdint.h" #include "data_task.h" // 配置mqtt参数 int MQTT_config() { uint16_t cat1_timeout = 0; char configMessage[64]; snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"version\",%d,%d \r\n",client_send_idx, MQTT_4); while(Iot_SendCmd(configMessage,"OK", 1)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ return TIMEOUT; } } cat1_timeout = 0; memset(configMessage, 0, sizeof(configMessage)); snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"keepalive\",%d,120 \r\n",client_send_idx); while(Iot_SendCmd(configMessage,"OK", 1)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ return TIMEOUT; } } // 配置需求参数 cat1_timeout = 0; memset(configMessage, 0, sizeof(configMessage)); snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"session\",%d,1 \r\n",client_send_idx); while(Iot_SendCmd(configMessage,"OK", 1)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ return TIMEOUT; } } cat1_timeout = 0; memset(configMessage, 0, sizeof(configMessage)); snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"timeout\",%d,10,3,1 \r\n",client_send_idx); while(Iot_SendCmd(configMessage,"OK", 1)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ return TIMEOUT; } } cat1_timeout = 0; memset(configMessage, 0, sizeof(configMessage)); snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"will\",%d \r\n",client_send_idx); while(Iot_SendCmd(configMessage,"OK", 1)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ return TIMEOUT; } } return SUCCESS; } uint8_t mqtt_pulish(char* message, void* buf ) { uint8_t i; HAL_UART_Transmit(&USART_InitStruct_DEBUG, (uint8_t*)message, strlen(message), HAL_MAX_DELAY); while(strstr((char*)&UART6_RX_BUF,">") == NULL){ i++; delay_ms(100); if(i > 20) return 0;// 失败 } HAL_UART_Transmit(&USART_InitStruct_DEBUG, (uint8_t*)buf, strlen(buf), HAL_MAX_DELAY); return 1;// 成功 } // 连接MQTT- 发送模式 void MQTT_send_connect(void const* arg) { // MQTT_config(); uint16_t cat1_timeout = 0; __Start: // 选择模式 while(Iot_SendCmd("AT+QMTCFG=\"send/mode\",0,0 \r\n","OK",1)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } GATEWAY_PARAMS *get; get= get_gateway_config_params(); // 打开mqtt cat1_timeout = 0; char message[64]; snprintf(message, sizeof(message),"AT+QMTOPEN=%d,%s,%d \r\n",client_send_idx,get->host,get->port); while(Iot_SendCmd(message,"OK",20)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } // 连接服务器 mqtt_connectFlag = 0; cat1_timeout = 0; memset(message, 0, sizeof(message)); snprintf(message, sizeof(message), "AT+QMTCONN=%d,%s,%s,%s\r\n", client_send_idx,(char*)&clientid1,get->username, get->passwd); while(Iot_SendCmd(message,"OK",10)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } int result,index; char* mqttStr = strstr((char*)&UART6_RX_BUF,"+QMTCONN:"); sscanf(mqttStr, "+QMTCONN: %d,%d, ", &index, &result); #if 0 // 拿返回的result做判断 switch(result){ case NETSUCCESS: break; case NETERR: goto __Start; case 1: goto __Start; } #endif // 订阅主题 cat1_timeout = 0; memset(message, 0, sizeof(message)); snprintf(message, sizeof(message), "AT+QMTSUB=%d,1,%s,0 \r\n",client_send_idx,get->commandTopic); while(Iot_SendCmd(message,"OK", 5)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } mqtt_connectFlag = 1; mqtt_message_t msg; memset(&msg, 0, sizeof(msg)); struct Pub_Queue *pxMessage; //发布消息 while(1) { if(xQueue1 != 0) { if(xQueueReceive(xQueue1, &(pxMessage), (TickType_t)10)) { // 处理队列内部数据并上传 msg.payloadlen = pxMessage->pubLength; msg.qos = pxMessage->qos; msg.payload = (void *)pxMessage->message; cat1_timeout = 0; memset(message, 0, sizeof(message)); snprintf(message, sizeof(message), "AT+QMTPUBEX=%d,0,0,0,%s,%d\r\n", client_send_idx, get->messageTopic, strlen(msg.payload)); mqtt_pulish(message, msg.payload); memset(UART6_RX_BUF, 0, BUFF_SIZE); myfree(SRAMEX, pxMessage->pub_topic); myfree(SRAMEX, pxMessage->message); myfree(SRAMEX, pxMessage); } // watchdog_feed(); LTE_Delay(100); } } } // 连接MQTT- 接收模式 void MQTT_recv_connect(void const* arg) { uint16_t cat1_timeout = 0; char message[64]; while(1) { if(mqtt_connectFlag == 1) { // 选择模式 memset(message, 0, sizeof(message)); snprintf(message, sizeof(message), "AT+QMTCFG=\"recv/mode\",1,0,1 \r\n"); while(Iot_SendCmd(message,"OK", 1)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ break; } } //接收消息 memset(UART6_RX_BUF, 0, BUFF_SIZE); char* cmd = "AT+QMTRECV=0,1\r\n"; HAL_UART_Transmit(&USART_InitStruct_DEBUG, (uint8_t*)cmd, strlen(cmd), HAL_MAX_DELAY); while(1){ char* payloadstr = mymalloc(SRAMEX, 1024); payloadstr = strstr((char*)UART6_RX_BUF, "+QMTRECV:"); if (!payloadstr) { myfree(SRAMEX, payloadstr); } else { payloadstr = strstr((char*)UART6_RX_BUF, "{"); sprintf(payloadstr + strlen(payloadstr) - 3 , " "); write_modbus_data(payloadstr); } memset(UART6_RX_BUF, 0, BUFF_SIZE); LTE_Delay(100); } } // watchdog_feed(); LTE_Delay(100); } } #if 0 void MQTT_send_connect(void* arg) { uint16_t cat1_timeout = 0; void *mboxMsg; uint8_t err; char message[64]; __Start: memset(message, 0, sizeof(message)); snprintf(message, sizeof(message), "AT+QMTCFG=\"recv/mode\",%d,0,1 \r\n",client_recv_idx); while(Iot_SendCmd(message,"OK", 5)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } // 打开mqtt cat1_timeout = 0; snprintf(message, sizeof(message),"AT+QMTOPEN=%d,\"36.134.23.11\",1883 \r\n",client_recv_idx); while(Iot_SendCmd(message,"OK",20)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } // 连接服务器 mqtt_connectFlag = 0; cat1_timeout = 0; memset(message, 0, sizeof(message)); snprintf(message, sizeof(message), "AT+QMTCONN=%d,%s\r\n",client_send_idx,(char*)&clientid1); while(Iot_SendCmd(message,"OK",10)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } // 订阅主题 cat1_timeout = 0; memset(message, 0, sizeof(message)); snprintf(message, sizeof(message), "AT+QMTSUB=%d,1,\"test0003/command\",0 \r\n",client_send_idx); while(Iot_SendCmd(message,"OK",10)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } while(1) { mboxMsg = OSMboxPend(mqtt_sendMseeageMbox, 1000, &err); if(OS_ERR_NONE == err) { if((*(unsigned int *)mboxMsg) == MBOX_NETWORK_ERROR) goto __Start; else { cat1_timeout = 0; memset(message, 0, sizeof(message)); snprintf(message, sizeof(message), "AT+QMTPUBEX=%d,0,0,0,\"test0003\",%d\r\n",client_send_idx,strlen(pubJsonString)); while(Iot_SendCmd(message,">",2)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } cat1_timeout = 0; while(Iot_SendCmd(pubJsonString,"+QMTPUBEX", 5)){ LTE_Delay(1); cat1_timeout ++; if(cat1_timeout >= 2000){ goto __Start; } } memset(pubJsonString, 0, strlen(pubJsonString)); } } LTE_Delay(100); } } #endif void MQ_threadCreate() { // mqtt_client_t *client = NULL; // 创建一个客户端 mqtt_log_init(); printf("\nwelcome to mqttclient test...\n"); xQueue1 = xQueueCreate(10, sizeof(struct Pub_Queue *)); // 创建一个mqtt上传的队列 osThreadDef(MQTT_send_task, MQTT_send_connect, osPriorityNormal, 0, configMINIMAL_STACK_SIZE * 8); osThreadCreate (osThread(MQTT_send_task), NULL); osThreadDef(MQTT_recv_task, MQTT_recv_connect, osPriorityNormal, 0, configMINIMAL_STACK_SIZE * 8); osThreadCreate (osThread(MQTT_recv_task), NULL); }