MQTT 消息 发布 订阅

Posted Bigben

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT 消息 发布 订阅相关的知识,希望对你有一定的参考价值。


 

当连接向一个mqtt服务器时,clientId必须是唯一的。设置一样,导致client.setCallback总是走到 connectionLost回调。报connection reset。调查一天才发现是clientid重复导致。

client = new MqttAsyncClient(serverURIString, "client-id");

 


 

clientId是用来保存会话信息。

MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);

当服务器将message发布向所有订阅过的客户端,就会清除这个message。如果当前客户端不在线,等它连接时发送。

 

session与client之间的关系是怎样的?

这样的,比如你一个板子,作为客户端,发起mqtt的连接请求connect到mqtt服务器,比如说就是emqtt服务吧,emqtt服务端收到这个板子的连接请求之后,在tcp层上会和板子建立一个tcp的连接,在emqtt内部,会产生一个进程,和这个板子做数据通讯,同时还会产生一个进程,叫session,这个sessoin是专门管理这个板子订阅的主题,其它板子如果发布了这个板子感兴趣的主题的时候,也会发到这个板子对应的这个session里面,如果这个session收到订阅的主题之后,发现对用的client还活着,就通过这个client把数据经过tcp发到这个板子上,如果发现client已经没有了,就是说板子和服务端断掉了,那么session就会把收到的订阅的主题,先保存在session里面,下次板子连接上了,而且cleansession=false,那么这个session就不会清除,在这次连接时,就会把以前收到的订阅消息,发给板子,大概就是这个意思。


 

参考:

http://www.blogjava.net/yongboy/archive/2014/02/15/409893.html

http://www.cnblogs.com/znlgis/p/4930990.html

http://blog.csdn.net/ljf10010/article/details/51424506

paho客户端示例

 

https://github.com/eclipse/paho.mqtt.java/tree/master/org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test

http://www.eclipse.org/paho/files/javadoc/index.html api文档

ibm客户端paho示例:

http://www.programcreek.com/java-api-examples/index.php?source_dir=streamsx.messaging-master/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttAsyncClientWrapper.java

hivemq的mqtt详解:http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages

 

 http://blog.csdn.net/wuyinxian/article/details/38826259

MQTT Protocol Manual(Apollo中MQTT协议解析)

 
 
 

 
 MQTT协议
Apollo允许客户端通过开放的MQTT协议连接。该协议主要是用在资源有限的驱动上,以及网络不稳定的情况下使用,是一个订阅、发布模型。这种驱动通常不适用类似http,stomp这类基于文本,或者类似openfire,AMQP等传统二进制协议。MQTT是一个简介的二进制协议,适用这类驱动资源受限,而且是不稳定的网络条件下。
之前的稳定发布版本中,MQTT是作为一个Apollo的一个插件提供的。但是现在,这个插件已经变为开发项目的一部分。MQTT在Apollo中已经不需要其他配置文件或者是第三方插件支持了。
MQTT是一个线路层的协议,任何实现该协议的客户端都可以连接到Apollo。当然也可以整合其他MQTT兼容的消息代理中。
更多有关MQTT协议内容,参考the MQTT Specification
 MQTT协议配置
为了开始使用MQTT协议,首先使用MQTT3.1协议的客户端,连接到Apollo正在监听端口。Apollo会做协议检测,而且自动识别MQTT连接,而且将连接作为MQTT协议处理。你不必要为MQTT协议打开一个端口(STomp,Openfire,AMQP等都是自动识别)。如果你一定指定连接的协议,有下面两种方式:你可以选择不用协议识别,而是为MQTT指定连接:
<connector id="tcp" bind="tcp://0.0.0.0:61613" protocol="mqtt"/>
或者你可以限制哪种协议可以被自动识别。通过下面的<detece>配置方式:
<connector id="tcp" bind="tcp://0.0.0.0:61613">
  <detect protocols="mqtt openwire" />
</connector>
<detect> 下protocols 对应的参数通过空格来隔开支持的通信协议。如果只支持一种协议,就不要空格,默认情况下对任何协议生效。
如果你想调整MQTT默认设置,在apollo.xml文件中有一个<connector> 元素,通过MQTT参数配置:
<connector id="tcp" bind="tcp://0.0.0.0:61613">
  <mqtt max_message_length="1000" />
</connector>
MQTT元素支持下面几个参数:
  • max_message_length : The size (in bytes) of the largest message that can be sent to the broker. Defaults to 100MB(broker能接受的最大消息量:默认是100M)
  • protocol_filters : A filter which can filter frames being sent/received to and from a client. It can modify the frame or even drop it.(一个控制发送和接收,Client的过滤器框架。可以修改,删除这个框架)
  • die_delay : How long after a connection is deemed to be “dead” before the connection actually closes; default: 5000ms(在实际断开连接之前,会有默认5000ms的时间被认为连接已经dead)
mqtt 配置元素也可以用来控制目的消息头的解析。下面是支持的参数:
  • queue_prefix : a tag used to identify destination types; default: null(用来确认目的地类型)
  • path_separator : used to separate segments in a destination name; default: /(用来分割目的地名称)
  • any_child_wildcard : indicate all child-level destinations that match the wildcard; default: +(识别子目录)
  • any_descendant_wildcard : indicate destinations that match the wildcard recursively; default:#(目标地址通配符)
  • regex_wildcard_start : pattern used to identify the start of a regex(表示正则表达开始)
  • regex_wildcard_end : pattern used to identify the end of a regex(表示正则表达结束)
  • part_pattern : allows you to specify a regex that constrains the naming of topics. (你可以指定正则表达规则)default: [ a-zA-Z0-9\\_\\-\\%\\~\\:\\(\\)]+
 Client 可用函数库
Apollo 支持MQTT3.1 协议,下面是可用的Clients:
如果要找到新支持的Clients ,可以检索:the MQTT website for its software
在目录example 目录下,你可以找到一些例子,实现了与broker之间收发。
 connecting
为了确保broker配置文件的安全,所以只允许一个admin 用户连接,默认的用户名和密码是:admin ,password.
Mqtt 客户端不能specify 虚拟主机(更多请看:see the section on Virtual Hosts in the user guide),以至于默认情况下虚拟主机已经被使用了。通常第一虚拟主机定义在apollo.xml文件中。
 Destination 类型
MQTT协议是订阅,发布协议,是不允许真正的利用队列点对点的消息收发。因此Apollo仅允许利用主题,还进行MQTT消息发送。订阅的概念和持久的主题订阅 和其他协议提到的有些类似,同时也被MQTT CONNECT 框架的clean session属性控制。
 Clean Sessions
但一个Client 发送一个连接,这个连接中clean session 被设置为false,那么之前连接中有相同Client_id 的session 将会被重复使用。这就意味着Client断开了,订阅依然能收到消息。这就等同与同Apollo建立一个长订阅。
如果 clean session 设置为true ,那么新session就开始了,其他的session会慢慢消失,删除。这就是Apollo中定义的普通的主题订阅。
 Topic Retained Messages
如果消息被发布的同时retain 标记被设置,消息将被主题记住,以至于新的订阅到达,最近的retain 消息会被发送到订阅者。比如说:你想发布一个参数,而且你想让最新的这个参数发布到总是可用的订阅了这个主题的客户端上,你就设置在PUBLISH 框架上设置retain 标签。
注意:retained 消息 不会被设置成retained 在 QoS设置为零的broker 重启过程中。

Last Will and Testament Message

当Client第一次连接的时候,有一个will 消息和一个更QoS相关的消息会跟你有关。will消息是一个基础消息,这个基础消息只有在连接异常或者是掉线的时候才会被发送。一般用在你有一个设备,当他们掉了的时候,你需要知道。所以如果一个医疗Client从broker掉线,will消息将会作为一个闹钟主题发送,而且会被系统作为高优先级提醒。

Reliable Messaging

MQTT协议允许Client 发布消息的时候指定Qos参数:
  • At Most Once (QoS=0)
  • At Least Once (QoS=1)
  • Exactly Once (QoS=2)
 最多一次
这个设置时推送消息给Client,可靠性最低的一种。如果设置Qos=0,那broker就不会返回结果码,告诉你他收到消息了,也不会在失败后尝试重发。这有点像不可靠消息,如JMS。
 至少一次
该设置会确保消息会被至少一次推送到Client。如果推送设置为至少推送一次,Apollo会返回一个回调函数,确保代理已经收到消息,而且确保会确保推送该消息。如果Client 将发布了一个Qos=1的消息,如果在指定的时间内没有收到回复,Client会希望重新发布这个消息。所以可能存在这种情况:代理收到一个需要推送的消息,然后又收到一个消息推送到同一个Client。所以如果传输过程中PUBACK丢失,Client会重新发送,而且不会去检测是否是重发,broker就将消息发送到订阅主题中。
 恰好一次
该设置是可靠等级最高的。他会确保发布者不仅仅会推送,而且不会像Qos=1 那样,会被接收两次。当然这个设置会增加网络的负载。当一个消息被发布出去的时候,broker会保存该消息的id,而且会利用任何长连接,坚持要把该消息推送给目标地址。如果Client收到PUBREC 标志,那就表明broker已经收到消息了。 这个时候broker会期待Client发送一个PUBREL 来清除session 中消息id,broker如果发送成功就会发送一个PUBCOMP通知Client。

Wildcard Subscriptions

通配用在主题的目标地址中。这能实现一个主题发送到多个用户,或者多层用户中。
  • / is used to separate names in a path(分割路径)
  • + is used to match any name in a path(通配地址任何字符)
  • # is used to recursively match path names(递归通配)
比如通配可能这样来用:
  • PRICE/# : Any price for any product on any exchange(任何交易中任何产品的价格)
  • PRICE/STOCK/# : Any price for a stock on any exchange(任何交易中的股票价格)
  • PRICE/STOCK/NASDAQ/+ : Any stock price on NASDAQ(纳斯达克的任何股票价格)
  • PRICE/STOCK/+/IBM : Any IBM stock price on any exchange(任何交易中IBM股票价格)

Keep Alive

Apollo只有在Client指定了CONNECT的KeepAlive 值的时候,才会设置保持连接、心跳检测。如果one Client指定了keepalive,apollo 将会使用1.5*keepalive值。这个在MQTT中有说明。

Destination Name Restrictions

路径名称限制了使用(a-z, A-Z, 0-9, _, - %, ~, :, \' \', \'(\', \')\' ,. )字符,通配符(*)在复杂的分隔符中。而且确保使用utf-8来编译你的URL。
  1 package com.xxx.mqtt;
  2 
  3 import java.net.URI;
  4 
  5 import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
  6 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  7 import org.eclipse.paho.client.mqttv3.IMqttToken;
  8 import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
  9 import org.eclipse.paho.client.mqttv3.MqttCallback;
 10 import org.eclipse.paho.client.mqttv3.MqttClient;
 11 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 12 import org.eclipse.paho.client.mqttv3.MqttException;
 13 import org.eclipse.paho.client.mqttv3.MqttMessage;
 14 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 15 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 16 
 17 
 18 public class MyMqttClient implements MqttCallback {
 19 
 20     private static final MemoryPersistence DATA_STORE = new MemoryPersistence();
 21     private static final String topic = "mytopic";
 22 
 23     private String HOST = "127.0.0.1";
 24     private int PORT = 1883;
 25     private String USERNAME = "user";
 26     private String PASSWORD = "password";
 27     private String serverURIString = "tcp://" + HOST + ":" + PORT;
 28     
 29     String clientId = "client-1";
 30 
 31     MqttAsyncClient client;
 32     // Tokens
 33     IMqttToken connectToken;
 34     IMqttDeliveryToken pubToken;
 35 
 36 
 37     public static void main(String[] args) {
 38         MyMqttClient app = new MyMqttClient();
 39         app.asyncClient();
 40         try {
 41             Thread.sleep(20000);
 42             app.disconnect();
 43         } catch (Exception e) {
 44             e.printStackTrace();
 45         }
 46         System.out.println("end");
 47     }
 48 
 49     public void blockingClient() {
 50         
 51         try {
 52             MqttClient sampleClient = new MqttClient(serverURIString, clientId);
 53             MqttConnectOptions connOpts = new MqttConnectOptions();
 54             connOpts.setCleanSession(true);
 55             connOpts.setUserName(USERNAME);
 56             connOpts.setPassword(PASSWORD.toCharArray());
 57             System.out.println("Connecting to broker: " + serverURIString);
 58             sampleClient.connect(connOpts);
 59             sampleClient.subscribe("#", 1);
 60             System.out.println("Connected");
 61 //                System.out.println("Publish message: " + content);
 62 //                MqttMessage message = new MqttMessage(content.getBytes());
 63 //                message.setQos(qos);
 64             sampleClient.setCallback(this);
 65 //                sampleClient.publish(topic, message);
 66 //                System.out.println("Message published");
 67             try {
 68                 Thread.sleep(10000000);
 69                 System.out.println("Disconnected");
 70                 sampleClient.disconnect();
 71             } catch (Exception e) {
 72                 e.printStackTrace();
 73             }
 74 
 75         } catch (MqttException me) {
 76             System.out.println("reason " + me.getReasonCode());
 77             System.out.println("msg " + me.getMessage());
 78             System.out.println("loc " + me.getLocalizedMessage());
 79             System.out.println("cause " + me.getCause());
 80             System.out.println("except " + me);
 81             me.printStackTrace();
 82         }
 83     }
 84 
 85     public void asyncClient() {
 86         info(" MQTT init start.");
 87 
 88         // Tokens
 89         IMqttToken connectToken;
 90         IMqttDeliveryToken pubToken;
 91 
 92         // Client Options
 93         MqttConnectOptions options = new MqttConnectOptions();
 94         options.setCleanSession(false);
 95         options.setAutomaticReconnect(true);
 96 
 97         options.setUserName(USERNAME);
 98         options.setPassword(PASSWORD.toCharArray());
 99 
100         try {
101             client = new MqttAsyncClient(serverURIString, clientId);
102 
103             DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions();
104             disconnectedOpts.setBufferEnabled(true);
105             client.setBufferOpts(disconnectedOpts);
106 
107             connectToken = client.connect(options);
108             connectToken.waitForCompletion();//异步变成了同步。可以用IMqttCallbackListen..在connect时候设置回调。
109             boolean isConnected = client.isConnected();
110             info("Connection isConnected: " + isConnected);
111 
112             if (connectToken.isComplete() && connectToken.getException() == null && client.isConnected()) {
113                 info("[Connect:] Success: "); //$NON-NLS-1$ //$NON-NLS-2$ 
114                 client.setCallback(this);
115 
116             } else {
117                 info("[Connect:] faild: "); //$NON-NLS-1$ //$NON-NLS-2$ 
118             }
119 
120 //             MqttTopic topic = client.getTopic(topic); 
121 //             topic.
122             //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息    
123 //                options.setWill(topic, "close".getBytes(), 2, true);  
124 
125             IMqttToken subToken = client.subscribe("#", 1);
126 
127             subToken.waitForCompletion(1000);
128 
129             if (subToken.isComplete()) {
130                 info("subToken  complete.");
131                 if (subToken.getException() != null) {
132                     info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 
133                 }
134             } else {
135                 info("subToken not complete.");
136                 if (subToken.getException() != null) {
137                     info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 
138                 }
139             }
140 
141             info("init end");
142 
143         } catch (MqttException e) {
144             // TODO Auto-generated catch block
145             e.printStackTrace();
146         }
147 
148     }
149 
150 //String clientId, String topic, String message
151     public void send() {
152         String topic;
153         String message;
154         info("===Send Message start.===");
155         message = "Hello, boy.";
156         
157 
158         boolean isConnected = client.isConnected();
159         if (!isConnected) {
160             //no need. it will auto reconnect and send.
161         }
162 
163         // Publish Message
164         try {
165             pubToken = client.publish(topic, new MqttMessage(message.getBytes()));
166 
167             info("Publish attempted: isComplete:" + pubToken.isComplete());
168 
169             pubToken.waitForCompletion();
170         } catch (MqttPersistenceException e) {
171             // TODO Auto-generated catch block
172             e.printStackTrace();
173         } catch (MqttException e) {
174             // TODO Auto-generated catch block
175             e.printStackTrace();
176         }
177 
178         // Check that Message has been delivered
179         info("Message Delivered: " + pubToken.isComplete());
180         info("=== send end.====");
181     }
182 
183     void disconnect() {
184         IMqttToken disconnectToken;
185         try {
186             disconnectToken = client.disconnect();
187             disconnectToken.waitForCompletion();
188             client.close();
189         } catch (MqttException e) {
190             // TODO Auto-generated catch block
191             e.printStackTrace();
192         }
193         client = null;
194     }
195 
196     void info(String s) {
197         System.out.println(s);
198     }
199 
200     public void connectionLost(Throwable thrwbl) {
201         // TODO Auto-generated method stub
202         info("connectionLost");
203 
204         info("MQTT is disconnected from topic: {}. Message: {}. Cause: {}" + topic + thrwbl.getMessage() + thrwbl.getCause().getMessage());
205         thrwbl.printStackTrace();
206 
207     }
208 
209     public void deliveryComplete(IMqttDeliveryToken arg0) {
210         // TODO Auto-generated method stub
211         info("deliveryComplete");
212 
213     }
214 
215     public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
216         // TODO Auto-generated method stub
217         String message = new String(arg1.getPayload());
218         String topic = arg0;
219 
220         info("xxx Receive : topic=" + topic + "; message=" + message);
221 
222     }
223 }
View Code
 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3     <modelVersion>4.0.0</modelVersion>
 4 
 5     <groupId>com.italktv.mqtt.client</groupId>
 6     <artifactId>mqttclient</artifactId>
 7     <version>0.0.1-SNAPSHOT</version>
 8     <packaging>jar</packaging>
 9 
10     <name>mqttclient</name>
11     <url>http://maven.apache.org</url>
12 
13     <properties>
14         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15     </properties>
16 
17     <dependencies>
18         <dependency>
19             <groupId>junit</groupId>
20             <artifactId>junit</artifactId>
21             <version>3.8.1</version>
22             <scope>test</scope>
23         </dependency>
24 
25 
26         <dependency>
27             <groupId>org.eclipse.paho</groupId>
28             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
29             <version>1.1.0</version>
30         </dependency>
31 
32     </dependencies>
33 </project>
View Code

上面是maven管理项目的pom.xml

以上是关于MQTT 消息 发布 订阅的主要内容,如果未能解决你的问题,请参考以下文章

Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)

python使用MQTT协议发送订阅消息

MQTT 消息 发布 订阅

SpringBoot2.x集成MQTT实现消息订阅(附源码)

MQTT 简介

MQTT协议