Linux C的MQTT测试代码编写 - 跨主机的MQTT客户端通信
Posted Andrea-地面宇航员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux C的MQTT测试代码编写 - 跨主机的MQTT客户端通信相关的知识,希望对你有一定的参考价值。
Linux C的MQTT测试代码编写
做完在同一个主机下的MQTT客户端实验后,做亿点点改变,我们使用移植好的MQTT接口在Ubuntu下写一个MQTT的客户端,然后和windows下的MQTT.fx工具通信。
-
代码
#include <stdio.h> #include <stdlib.h> #include <string.h> #include "MQTTClient.h" //需要在系统中提前安装好MQTT,可以参考 #define ADDRESS "tcp://192.168.50.81:1883" //根据 MQTT 实际主机地址调整 #define CLIENTID_PUB "ExampleClientPub" #define CLIENTID_SUB "ExampleClientSub" #define QOS 1 #define TIMEOUT 10000L #define PUB_TOPIC "ScratchToSoftWare" #define SUB_TOPIC "SoftWareToScratch" #define NUM_THREADS 2 //线程个数,一个用于发布,一个用于订阅 #define FAN_OFF "{'fan':false}" //表示风扇关闭的json字符串 #define FAN_ON "{'fan':true}" //表示风扇打开的json字符串 int fan_state = 0; volatile MQTTClient_deliveryToken deliveredtoken; //传递给MQTTClient_setCallbacks的回调函数,消息发送成功后,调用此回调函数 void delivered(void *context, MQTTClient_deliveryToken dt) { printf("Message with token value %d delivery confirmed\\n", dt); deliveredtoken = dt; } //传递给MQTTClient_setCallbacks的回调函数 消息到达后,调用此回调函数 int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { printf("Message arrived\\n"); printf(" topic: %s\\n", topicName); printf(" message: %.*s\\n", message->payloadlen, (char*)message->payload); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } //传递给MQTTClient_setCallbacks的回调函数 连接异常断开后调用此回调函数 void connlost(void *context, char *cause) { printf("\\nConnection lost\\n"); printf(" cause: %s\\n", cause); } //实现MQTT的订阅 void *mqtt_subscribe() { MQTTClient client; //定义一个MQTT客户端client MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; int rc; //初始化client,设置MQTT服务器的地址"tcp://192.168.50.81:1883" 并订阅"ExampleClientSub" if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID_SUB, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) { printf("Failed to create client, return code %d\\n", rc); rc = EXIT_FAILURE; goto exit; } //设置回调函数, if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS) { printf("Failed to set callbacks, return code %d\\n", rc); rc = EXIT_FAILURE; goto destroy_exit; } conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; //连接服务器 if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\\n", rc); rc = EXIT_FAILURE; goto destroy_exit; } printf("Subscribing to topic %s\\nfor client %s using QoS%d\\n\\n" "Press Q<Enter> to quit\\n\\n", SUB_TOPIC, CLIENTID_SUB, QOS); //订阅主题 if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS) { printf("Failed to subscribe, return code %d\\n", rc); rc = EXIT_FAILURE; } else { int ch; do { ch = getchar(); } while (ch!='Q' && ch != 'q'); //等待输入字符 'Q'或'q' if ((rc = MQTTClient_unsubscribe(client, SUB_TOPIC)) != MQTTCLIENT_SUCCESS) //取消订阅 { printf("Failed to unsubscribe, return code %d\\n", rc); rc = EXIT_FAILURE; } } if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS) //断开和服务器的连接 { printf("Failed to disconnect, return code %d\\n", rc); rc = EXIT_FAILURE; } destroy_exit: MQTTClient_destroy(&client); //释放客户端的资源 exit: return NULL; } //实现MQTT的发布 void *mqtt_publish() { MQTTClient client; //定义一个MQTT客户端 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; char data[1024]; int rc; //初始化MQTT客户端 if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID_PUB, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) { printf("Failed to create client, return code %d\\n", rc); exit(EXIT_FAILURE); } conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; //连接MQTT服务器 if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\\n", rc); exit(EXIT_FAILURE); } pubmsg.qos = QOS; pubmsg.retained = 0; while(1) { //判断风扇标志位的值,封装发送的消息 if (fan_state == 0) { pubmsg.payload = FAN_ON; pubmsg.payloadlen = strlen(FAN_ON); fan_state = 1; } else { pubmsg.payload = FAN_OFF; pubmsg.payloadlen = strlen(FAN_OFF); fan_state = 0; } //循环发送消息,在"ScratchToSoftWare" 这个主题下发布消息 if ((rc = MQTTClient_publishMessage(client, PUB_TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS) { printf("Failed to publish message, return code %d\\n", rc); exit(EXIT_FAILURE); } printf("Waiting for up to %d seconds for publication of %s\\n" "on topic %s for client with ClientID: %s\\n", (int)(TIMEOUT/1000), data, PUB_TOPIC, CLIENTID_PUB); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\\n", token); usleep(5000000); } if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS) printf("Failed to disconnect, return code %d\\n", rc); MQTTClient_destroy(&client); return NULL; } int main(int argc, char **argv) { pthread_t threads[NUM_THREADS]; pthread_create(&threads[0], 0, mqtt_subscribe, NULL); pthread_create(&threads[1], 0, mqtt_publish, NULL); pause(); return 0; }
-
编译命令
$gcc main.c -lpthread -lpaho-mqtt3c
-
效果展示
以上是关于Linux C的MQTT测试代码编写 - 跨主机的MQTT客户端通信的主要内容,如果未能解决你的问题,请参考以下文章