不到百行Demo看清MQTT收发处理逻辑

Posted 太阳德生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了不到百行Demo看清MQTT收发处理逻辑相关的知识,希望对你有一定的参考价值。

static void messageArrived(MessageData* data)

	mqtt_printf(MQTT_INFO, "Message arrived on topic %s: %s\\n",data->topicName->lenstring.data,(char*)data->message->payload);

	/**/
	memset((char*)data->message->payload,0,(char*)data->message->payloadlen);//����


void MQTTTaskDemo(void *pvParameters)

	/* To avoid gcc warnings */
	( void ) pvParameters;
	
	/* connect to gpssensor.ddns.net, subscribe to a topic, send and receive messages regularly every 5 sec */
	MQTTClient client;
	Network network;
	unsigned char sendbuf[512], readbuf[80];
	int rc = 0, count = 0;
	MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
//	char* address = "www.xuhongv.com";
//	char* sub_topic = "LASS/Test/Pm25Ameba/cc";
//	char* pub_topic = "LASS/Test/Pm25Ameba/FT1_018";
	connectData.MQTTVersion = 4;
	connectData.clientID.cstring = "56";
	connectData.username.cstring="66717";
	connectData.password.cstring="eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRJZCI6IjU2IiwidXNlcm5hbWUiOiI2NjcxNyIsInRpbWVzdGFtcCI6MTU5OTU1MDM3MjcyM30.JxIp4oBPth6OmUy6s2RW4_E20mY9IiY9LyfzJk6VwtI";
	char* address = "test-mqtt.xxxx.net";
	char* sub_topic = "2000stn/service/transmit/66717/3455/#";
	char* pub_topic = "2000stn/event/status/66717/123";
	
	NetworkInit(&network);
	MQTTClientInit(&client, &network, 30000, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf));

	mqtt_printf(MQTT_INFO, "Wait Wi-Fi to be connected.");
	while(wifi_is_ready_to_transceive(RTW_STA_INTERFACE) != RTW_SUCCESS) 
		vTaskDelay(5000 / portTICK_PERIOD_MS);
	
	mqtt_printf(MQTT_INFO, "Wi-Fi connected.");
	
	mqtt_printf(MQTT_INFO, "Connect Network \\"%s\\"", address);
	while ((rc = NetworkConnect(&network, address, 1883)) != 0)
		mqtt_printf(MQTT_INFO, "Return code from network connect is %d\\n", rc);
		vTaskDelay(1000 / portTICK_PERIOD_MS);
	
	mqtt_printf(MQTT_INFO, "\\"%s\\" Connected", address);

	mqtt_printf(MQTT_INFO, "Start MQTT connection");
	while ((rc = MQTTConnect(&client, &connectData)) != 0)
		mqtt_printf(MQTT_INFO, "Return code from MQTT connect is %d\\n", rc);
		vTaskDelay(1000 / portTICK_PERIOD_MS);
	
	mqtt_printf(MQTT_INFO, "MQTT Connected");

	mqtt_printf(MQTT_INFO, "Subscribe to Topic: %s", sub_topic);
	if ((rc = MQTTSubscribe(&client, sub_topic, QOS0, messageArrived)) != 0) 
		mqtt_printf(MQTT_INFO, "Return code from MQTT subscribe is %d\\n", rc);

	mqtt_printf(MQTT_INFO, "Publish Topics: %s", pub_topic);
	while (1)
	
		MQTTMessage message;
		char payload[300];

		if (++count == 0)
			count = 1;
		
		message.qos = QOS0;
		message.retained = 0;
		message.payload = payload;
		sprintf(payload, "hello from AMEBA %d", count);
		message.payloadlen = strlen(payload);

		if ((rc = MQTTPublish(&client, pub_topic, &message)) != 0)
			mqtt_printf(MQTT_INFO,"Return code from MQTT publish is %d\\n", rc);
		if ((rc = MQTTYield(&client, 1000)) != 0)
			mqtt_printf(MQTT_INFO,"Return code from yield is %d\\n", rc);
		vTaskDelay(5000);
	
	/* do not return */

这是比较简洁的mqtt处理的demo,核心处理收的就是一个回调函数messageArrived,发送和接收是不同topic的,发送其实核心就一个MQTTPublish,但是外围组织数据的逻辑靠自己业务特色来弄了,这个例子基本能搞明白MQTT咋整起来,收到数据的解析和发送数据的组织都是可以在这个基础上展开,相对没那么难搞了。

以上是关于不到百行Demo看清MQTT收发处理逻辑的主要内容,如果未能解决你的问题,请参考以下文章

还在手动埋点么?out 了!不到百行代码实现自动埋点

springboot下mqtt简单使用

通过集群的方式解决基于MQTT协议的RabbitMQ消息收发

基于RabbitMQ的MQTT插件搭建MQTT服务,使用MQTTX进行收发测试

MT7621加 OPENWRT 移植MQTT(paho.mqtt.c) 进行数据的收发

转: RabbitMQ实现中AMQP与MQTT消息收发异同