不到百行Demo看清MQTT收发处理逻辑
Posted coding码场
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了不到百行Demo看清MQTT收发处理逻辑相关的知识,希望对你有一定的参考价值。
static void messageArrived(MessageData* data)
mqtt_printf(MQTT_INFO, "Message arrived on topic %s: %s\\n",data->topicName->lenstring.data,(char*)data->message->payload);
/**/
memset((char*)data->message->payload,0,(char*)data->message->payloadlen);//����
void MQTTTaskDemo(void *pvParameters)
/* To avoid gcc warnings */
( void ) pvParameters;
/* connect to gpssensor.ddns.net, subscribe to a topic, send and receive messages regularly every 5 sec */
MQTTClient client;
Network network;
unsigned char sendbuf[512], readbuf[80];
int rc = 0, count = 0;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
// char* address = "www.xuhongv.com";
// char* sub_topic = "LASS/Test/Pm25Ameba/cc";
// char* pub_topic = "LASS/Test/Pm25Ameba/FT1_018";
connectData.MQTTVersion = 4;
connectData.clientID.cstring = "56";
connectData.username.cstring="66717";
connectData.password.cstring="eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRJZCI6IjU2IiwidXNlcm5hbWUiOiI2NjcxNyIsInRpbWVzdGFtcCI6MTU5OTU1MDM3MjcyM30.JxIp4oBPth6OmUy6s2RW4_E20mY9IiY9LyfzJk6VwtI";
char* address = "test-mqtt.xxxx.net";
char* sub_topic = "2000stn/service/transmit/66717/3455/#";
char* pub_topic = "2000stn/event/status/66717/123";
NetworkInit(&network);
MQTTClientInit(&client, &network, 30000, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf));
mqtt_printf(MQTT_INFO, "Wait Wi-Fi to be connected.");
while(wifi_is_ready_to_transceive(RTW_STA_INTERFACE) != RTW_SUCCESS)
vTaskDelay(5000 / portTICK_PERIOD_MS);
mqtt_printf(MQTT_INFO, "Wi-Fi connected.");
mqtt_printf(MQTT_INFO, "Connect Network \\"%s\\"", address);
while ((rc = NetworkConnect(&network, address, 1883)) != 0)
mqtt_printf(MQTT_INFO, "Return code from network connect is %d\\n", rc);
vTaskDelay(1000 / portTICK_PERIOD_MS);
mqtt_printf(MQTT_INFO, "\\"%s\\" Connected", address);
mqtt_printf(MQTT_INFO, "Start MQTT connection");
while ((rc = MQTTConnect(&client, &connectData)) != 0)
mqtt_printf(MQTT_INFO, "Return code from MQTT connect is %d\\n", rc);
vTaskDelay(1000 / portTICK_PERIOD_MS);
mqtt_printf(MQTT_INFO, "MQTT Connected");
mqtt_printf(MQTT_INFO, "Subscribe to Topic: %s", sub_topic);
if ((rc = MQTTSubscribe(&client, sub_topic, QOS0, messageArrived)) != 0)
mqtt_printf(MQTT_INFO, "Return code from MQTT subscribe is %d\\n", rc);
mqtt_printf(MQTT_INFO, "Publish Topics: %s", pub_topic);
while (1)
MQTTMessage message;
char payload[300];
if (++count == 0)
count = 1;
message.qos = QOS0;
message.retained = 0;
message.payload = payload;
sprintf(payload, "hello from AMEBA %d", count);
message.payloadlen = strlen(payload);
if ((rc = MQTTPublish(&client, pub_topic, &message)) != 0)
mqtt_printf(MQTT_INFO,"Return code from MQTT publish is %d\\n", rc);
if ((rc = MQTTYield(&client, 1000)) != 0)
mqtt_printf(MQTT_INFO,"Return code from yield is %d\\n", rc);
vTaskDelay(5000);
/* do not return */
这是比较简洁的mqtt处理的demo,核心处理收的就是一个回调函数messageArrived,发送和接收是不同topic的,发送其实核心就一个MQTTPublish,但是外围组织数据的逻辑靠自己业务特色来弄了,这个例子基本能搞明白MQTT咋整起来,收到数据的解析和发送数据的组织都是可以在这个基础上展开,相对没那么难搞了。
以上是关于不到百行Demo看清MQTT收发处理逻辑的主要内容,如果未能解决你的问题,请参考以下文章
通过集群的方式解决基于MQTT协议的RabbitMQ消息收发
基于RabbitMQ的MQTT插件搭建MQTT服务,使用MQTTX进行收发测试