Linux C的MQTT测试代码编写 - 跨主机的MQTT客户端通信

Posted Andrea-地面宇航员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux C的MQTT测试代码编写 - 跨主机的MQTT客户端通信相关的知识,希望对你有一定的参考价值。

Linux C的MQTT测试代码编写

做完在同一个主机下的MQTT客户端实验后,做亿点点改变,我们使用移植好的MQTT接口在Ubuntu下写一个MQTT的客户端,然后和windows下的MQTT.fx工具通信。

  1. 代码

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include "MQTTClient.h"  //需要在系统中提前安装好MQTT,可以参考
    #define ADDRESS "tcp://192.168.50.81:1883" //根据 MQTT 实际主机地址调整
    #define CLIENTID_PUB "ExampleClientPub"
    #define CLIENTID_SUB "ExampleClientSub"
    #define QOS 1
    #define TIMEOUT 10000L
    #define PUB_TOPIC "ScratchToSoftWare" 
    #define SUB_TOPIC "SoftWareToScratch" 
    #define NUM_THREADS 2    		  //线程个数,一个用于发布,一个用于订阅
    #define FAN_OFF "{'fan':false}"   //表示风扇关闭的json字符串
    #define FAN_ON "{'fan':true}"     //表示风扇打开的json字符串 
    int fan_state = 0; 
    volatile MQTTClient_deliveryToken deliveredtoken;
    
    
    //传递给MQTTClient_setCallbacks的回调函数,消息发送成功后,调用此回调函数
    void delivered(void *context, MQTTClient_deliveryToken dt)
    {
    	printf("Message with token value %d delivery confirmed\\n", dt);
    	deliveredtoken = dt;
    }
    
    //传递给MQTTClient_setCallbacks的回调函数 消息到达后,调用此回调函数 
    int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
    {
    	printf("Message arrived\\n");
    	printf(" topic: %s\\n", topicName);
    	printf(" message: %.*s\\n", message->payloadlen, (char*)message->payload);
    	MQTTClient_freeMessage(&message);
    	MQTTClient_free(topicName);
    	return 1; 
    }
    
    //传递给MQTTClient_setCallbacks的回调函数 连接异常断开后调用此回调函数 
    void connlost(void *context, char *cause)
    {
    	printf("\\nConnection lost\\n");
    	printf(" cause: %s\\n", cause);
    }
    
    //实现MQTT的订阅 
    void *mqtt_subscribe()
    {
    	MQTTClient client;  //定义一个MQTT客户端client
    	MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    	int rc;
    	//初始化client,设置MQTT服务器的地址"tcp://192.168.50.81:1883" 并订阅"ExampleClientSub"
    	if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID_SUB,
    					MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
    	{
    		printf("Failed to create client, return code %d\\n", rc);
    		rc = EXIT_FAILURE;
    		goto exit;
    	}
    	//设置回调函数,
    	if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd,
    					delivered)) != MQTTCLIENT_SUCCESS)
    	{
    		printf("Failed to set callbacks, return code %d\\n", rc);
    		rc = EXIT_FAILURE;
    		goto destroy_exit;
    	}
    	conn_opts.keepAliveInterval = 20;
    	conn_opts.cleansession = 1;
    	//连接服务器
    	if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    	{
    		printf("Failed to connect, return code %d\\n", rc);
    		rc = EXIT_FAILURE;
    		goto destroy_exit;
    	}
    	printf("Subscribing to topic %s\\nfor client %s using QoS%d\\n\\n"
    			"Press Q<Enter> to quit\\n\\n", SUB_TOPIC, CLIENTID_SUB, QOS);
    
    	//订阅主题 
    	if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS)
    	{
    		printf("Failed to subscribe, return code %d\\n", rc);
    		rc = EXIT_FAILURE;
    	}
    	else
    	{
    		int ch;
    		do
    		{
    			ch = getchar();
    		} while (ch!='Q' && ch != 'q');  //等待输入字符 'Q'或'q'
    		if ((rc = MQTTClient_unsubscribe(client, SUB_TOPIC)) != MQTTCLIENT_SUCCESS)  	//取消订阅 
    		{
    			printf("Failed to unsubscribe, return code %d\\n", rc);
    			rc = EXIT_FAILURE;
    		} }
    	if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)  //断开和服务器的连接 
    	{
    		printf("Failed to disconnect, return code %d\\n", rc);
    		rc = EXIT_FAILURE;
    	}
    destroy_exit:
    	MQTTClient_destroy(&client); //释放客户端的资源 
    exit:
    	return NULL; 
    }
    
    //实现MQTT的发布 
    void *mqtt_publish()
    {
    	MQTTClient client;  //定义一个MQTT客户端 
    	MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    	MQTTClient_message pubmsg = MQTTClient_message_initializer;
    	MQTTClient_deliveryToken token;
    	char data[1024];
    	int rc;
    	//初始化MQTT客户端 
    	if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID_PUB,
    					MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
    	{
    		printf("Failed to create client, return code %d\\n", rc);
    		exit(EXIT_FAILURE);
    	}
    	conn_opts.keepAliveInterval = 20;
    	conn_opts.cleansession = 1;
    	//连接MQTT服务器 
    	if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    	{
    		printf("Failed to connect, return code %d\\n", rc);
    		exit(EXIT_FAILURE);
    	}
    	pubmsg.qos = QOS;
    	pubmsg.retained = 0;
    	while(1) {
    		//判断风扇标志位的值,封装发送的消息
    		if (fan_state == 0) {
    			pubmsg.payload = FAN_ON;
    			pubmsg.payloadlen = strlen(FAN_ON);
    			fan_state = 1; } else {
    				pubmsg.payload = FAN_OFF;
    				pubmsg.payloadlen = strlen(FAN_OFF);
    				fan_state = 0; }
    		//循环发送消息,在"ScratchToSoftWare" 这个主题下发布消息
    		if ((rc = MQTTClient_publishMessage(client, PUB_TOPIC,
    						&pubmsg, &token)) != MQTTCLIENT_SUCCESS)
    		{
    			printf("Failed to publish message, return code %d\\n", rc);
    			exit(EXIT_FAILURE);
    		}
    		printf("Waiting for up to %d seconds for publication of %s\\n"
    				"on topic %s for client with ClientID: %s\\n", (int)(TIMEOUT/1000), data, PUB_TOPIC, CLIENTID_PUB);
    		rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    		printf("Message with delivery token %d delivered\\n", token);
    		usleep(5000000);
    	}
    	if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
    		printf("Failed to disconnect, return code %d\\n", rc);
    	MQTTClient_destroy(&client);
    	return NULL; 
    }
    
    
    int main(int argc, char **argv)
    {
    	pthread_t threads[NUM_THREADS];
    	pthread_create(&threads[0], 0, mqtt_subscribe, NULL);
    	pthread_create(&threads[1], 0, mqtt_publish, NULL);
    	pause();
    	return 0; 
    }
    
    
  2. 编译命令

    $gcc main.c -lpthread -lpaho-mqtt3c
    
  3. 效果展示

以上是关于Linux C的MQTT测试代码编写 - 跨主机的MQTT客户端通信的主要内容,如果未能解决你的问题,请参考以下文章

GO语言(十六):模糊测试入门(上)

如何编写 C 应用程序代码来测试 linux 音频驱动程序

使用locust测试MQTT协议

paho.mqtt.python模块怎么安装

写一个C语言程序: 能修改Linux 主机的IP、DHCP、DNS 等设置

MQTT使用与C接口测试