MQTT 消息 发布 订阅
当连接向一个mqtt服务器时,clientId必须是唯一的。设置一样,导致client.setCallback总是走到 connectionLost回调。报connection reset。调查一天才发现是clientid重复导致。
client = new MqttAsyncClient(serverURIString, "client-id");
MqttConnectOptions options = new MqttConnectOptions();
paho客户端示例 api文档
MQTT Protocol Manual(Apollo中MQTT协议解析)
更多有关MQTT协议内容,参考the MQTT Specification
<connector id="tcp" bind="tcp://" protocol="mqtt"/>
<connector id="tcp" bind="tcp://">
<detect protocols="mqtt openwire" />
<detect> 下protocols 对应的参数通过空格来隔开支持的通信协议。如果只支持一种协议,就不要空格,默认情况下对任何协议生效。
如果你想调整MQTT默认设置,在apollo.xml文件中有一个<connector> 元素,通过MQTT参数配置:
<connector id="tcp" bind="tcp://">
<mqtt max_message_length="1000" />
: The size (in bytes) of the largest message that can be sent to the broker. Defaults to 100MB(broker能接受的最大消息量:默认是100M)protocol_filters
: A filter which can filter frames being sent/received to and from a client. It can modify the frame or even drop it.(一个控制发送和接收,Client的过滤器框架。可以修改,删除这个框架)die_delay
: How long after a connection is deemed to be “dead” before the connection actually closes; default: 5000ms(在实际断开连接之前,会有默认5000ms的时间被认为连接已经dead)
mqtt 配置元素也可以用来控制目的消息头的解析。下面是支持的参数:
: a tag used to identify destination types; default: null(用来确认目的地类型)path_separator
: used to separate segments in a destination name; default:/(用来分割目的地名称)
: indicate all child-level destinations that match the wildcard; default:+(识别子目录)
: indicate destinations that match the wildcard recursively; default:#(目标地址通配符)
: pattern used to identify the start of a regex(表示正则表达开始)regex_wildcard_end
: pattern used to identify the end of a regex(表示正则表达结束)part_pattern
: allows you to specify a regex that constrains the naming of topics. (你可以指定正则表达规则)default:[ a-zA-Z0-9\\_\\-\\%\\~\\:\\(\\)]+
Client 可用函数库
Apollo 支持MQTT3.1 协议,下面是可用的Clients:
- Java : mqtt-client, MeQanTT
- C : libmosquitto
- Erlang : erlmqtt, my-mqtt4erl
- .NET : MQTTDotNet, nMQTT
- Perl : net-mqtt-perl, [anyevent-mqtt-perl]
- Python : nyamuk
- Ruby : mqtt-ruby, ruby-em-mqtt
- javascript : Node.js MQTT Client
- Delphi : TMQTTCLient
- Device specific: Arduino, mbed, Nanode, Netduino
如果要找到新支持的Clients ,可以检索:the MQTT website for its software
在目录example 目录下,你可以找到一些例子,实现了与broker之间收发。
为了确保broker配置文件的安全,所以只允许一个admin 用户连接,默认的用户名和密码是:admin ,password.
Mqtt 客户端不能specify 虚拟主机(更多请看:see the section on Virtual Hosts in the user guide),以至于默认情况下虚拟主机已经被使用了。通常第一虚拟主机定义在apollo.xml文件中。
Destination 类型
MQTT协议是订阅,发布协议,是不允许真正的利用队列点对点的消息收发。因此Apollo仅允许利用主题,还进行MQTT消息发送。订阅的概念和持久的主题订阅 和其他协议提到的有些类似,同时也被MQTT CONNECT 框架的clean session属性控制。
Clean Sessions
但一个Client 发送一个连接,这个连接中clean session 被设置为false,那么之前连接中有相同Client_id 的session 将会被重复使用。这就意味着Client断开了,订阅依然能收到消息。这就等同与同Apollo建立一个长订阅。
如果 clean session 设置为true ,那么新session就开始了,其他的session会慢慢消失,删除。这就是Apollo中定义的普通的主题订阅。
Topic Retained Messages
如果消息被发布的同时retain 标记被设置,消息将被主题记住,以至于新的订阅到达,最近的retain 消息会被发送到订阅者。比如说:你想发布一个参数,而且你想让最新的这个参数发布到总是可用的订阅了这个主题的客户端上,你就设置在PUBLISH 框架上设置retain 标签。
注意:retained 消息 不会被设置成retained 在 QoS设置为零的broker 重启过程中。
Last Will and Testament Message
当Client第一次连接的时候,有一个will 消息和一个更QoS相关的消息会跟你有关。will消息是一个基础消息,这个基础消息只有在连接异常或者是掉线的时候才会被发送。一般用在你有一个设备,当他们掉了的时候,你需要知道。所以如果一个医疗Client从broker掉线,will消息将会作为一个闹钟主题发送,而且会被系统作为高优先级提醒。
Reliable Messaging
MQTT协议允许Client 发布消息的时候指定Qos参数:
- At Most Once (QoS=0)
- At Least Once (QoS=1)
- Exactly Once (QoS=2)
该设置会确保消息会被至少一次推送到Client。如果推送设置为至少推送一次,Apollo会返回一个回调函数,确保代理已经收到消息,而且确保会确保推送该消息。如果Client 将发布了一个Qos=1的消息,如果在指定的时间内没有收到回复,Client会希望重新发布这个消息。所以可能存在这种情况:代理收到一个需要推送的消息,然后又收到一个消息推送到同一个Client。所以如果传输过程中PUBACK丢失,Client会重新发送,而且不会去检测是否是重发,broker就将消息发送到订阅主题中。
该设置是可靠等级最高的。他会确保发布者不仅仅会推送,而且不会像Qos=1 那样,会被接收两次。当然这个设置会增加网络的负载。当一个消息被发布出去的时候,broker会保存该消息的id,而且会利用任何长连接,坚持要把该消息推送给目标地址。如果Client收到PUBREC 标志,那就表明broker已经收到消息了。 这个时候broker会期待Client发送一个PUBREL 来清除session 中消息id,broker如果发送成功就会发送一个PUBCOMP通知Client。
Wildcard Subscriptions
is used to separate names in a path(分割路径)+
is used to match any name in a path(通配地址任何字符)#
is used to recursively match path names(递归通配)
: Any price for any product on any exchange(任何交易中任何产品的价格)PRICE/STOCK/#
: Any price for a stock on any exchange(任何交易中的股票价格)PRICE/STOCK/NASDAQ/+
: Any stock price on NASDAQ(纳斯达克的任何股票价格)PRICE/STOCK/+/IBM
: Any IBM stock price on any exchange(任何交易中IBM股票价格)
Keep Alive
Apollo只有在Client指定了CONNECT的KeepAlive 值的时候,才会设置保持连接、心跳检测。如果one Client指定了keepalive,apollo 将会使用1.5*keepalive值。这个在MQTT中有说明。
Destination Name Restrictions
路径名称限制了使用(a-z, A-Z, 0-9, _, - %, ~, :, \' \', \'(\', \')\' ,. )字符,通配符(*)在复杂的分隔符中。而且确保使用utf-8来编译你的URL。

1 package; 2 3 import; 4 5 import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; 6 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 7 import org.eclipse.paho.client.mqttv3.IMqttToken; 8 import org.eclipse.paho.client.mqttv3.MqttAsyncClient; 9 import org.eclipse.paho.client.mqttv3.MqttCallback; 10 import org.eclipse.paho.client.mqttv3.MqttClient; 11 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 12 import org.eclipse.paho.client.mqttv3.MqttException; 13 import org.eclipse.paho.client.mqttv3.MqttMessage; 14 import org.eclipse.paho.client.mqttv3.MqttPersistenceException; 15 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 16 17 18 public class MyMqttClient implements MqttCallback { 19 20 private static final MemoryPersistence DATA_STORE = new MemoryPersistence(); 21 private static final String topic = "mytopic"; 22 23 private String HOST = ""; 24 private int PORT = 1883; 25 private String USERNAME = "user"; 26 private String PASSWORD = "password"; 27 private String serverURIString = "tcp://" + HOST + ":" + PORT; 28 29 String clientId = "client-1"; 30 31 MqttAsyncClient client; 32 // Tokens 33 IMqttToken connectToken; 34 IMqttDeliveryToken pubToken; 35 36 37 public static void main(String[] args) { 38 MyMqttClient app = new MyMqttClient(); 39 app.asyncClient(); 40 try { 41 Thread.sleep(20000); 42 app.disconnect(); 43 } catch (Exception e) { 44 e.printStackTrace(); 45 } 46 System.out.println("end"); 47 } 48 49 public void blockingClient() { 50 51 try { 52 MqttClient sampleClient = new MqttClient(serverURIString, clientId); 53 MqttConnectOptions connOpts = new MqttConnectOptions(); 54 connOpts.setCleanSession(true); 55 connOpts.setUserName(USERNAME); 56 connOpts.setPassword(PASSWORD.toCharArray()); 57 System.out.println("Connecting to broker: " + serverURIString); 58 sampleClient.connect(connOpts); 59 sampleClient.subscribe("#", 1); 60 System.out.println("Connected"); 61 // System.out.println("Publish message: " + content); 62 // MqttMessage message = new MqttMessage(content.getBytes()); 63 // message.setQos(qos); 64 sampleClient.setCallback(this); 65 // sampleClient.publish(topic, message); 66 // System.out.println("Message published"); 67 try { 68 Thread.sleep(10000000); 69 System.out.println("Disconnected"); 70 sampleClient.disconnect(); 71 } catch (Exception e) { 72 e.printStackTrace(); 73 } 74 75 } catch (MqttException me) { 76 System.out.println("reason " + me.getReasonCode()); 77 System.out.println("msg " + me.getMessage()); 78 System.out.println("loc " + me.getLocalizedMessage()); 79 System.out.println("cause " + me.getCause()); 80 System.out.println("except " + me); 81 me.printStackTrace(); 82 } 83 } 84 85 public void asyncClient() { 86 info(" MQTT init start."); 87 88 // Tokens 89 IMqttToken connectToken; 90 IMqttDeliveryToken pubToken; 91 92 // Client Options 93 MqttConnectOptions options = new MqttConnectOptions(); 94 options.setCleanSession(false); 95 options.setAutomaticReconnect(true); 96 97 options.setUserName(USERNAME); 98 options.setPassword(PASSWORD.toCharArray()); 99 100 try { 101 client = new MqttAsyncClient(serverURIString, clientId); 102 103 DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions(); 104 disconnectedOpts.setBufferEnabled(true); 105 client.setBufferOpts(disconnectedOpts); 106 107 connectToken = client.connect(options); 108 connectToken.waitForCompletion();//异步变成了同步。可以用IMqttCallbackListen..在connect时候设置回调。 109 boolean isConnected = client.isConnected(); 110 info("Connection isConnected: " + isConnected); 111 112 if (connectToken.isComplete() && connectToken.getException() == null && client.isConnected()) { 113 info("[Connect:] Success: "); //$NON-NLS-1$ //$NON-NLS-2$ 114 client.setCallback(this); 115 116 } else { 117 info("[Connect:] faild: "); //$NON-NLS-1$ //$NON-NLS-2$ 118 } 119 120 // MqttTopic topic = client.getTopic(topic); 121 // topic. 122 //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 123 // options.setWill(topic, "close".getBytes(), 2, true); 124 125 IMqttToken subToken = client.subscribe("#", 1); 126 127 subToken.waitForCompletion(1000); 128 129 if (subToken.isComplete()) { 130 info("subToken complete."); 131 if (subToken.getException() != null) { 132 info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 133 } 134 } else { 135 info("subToken not complete."); 136 if (subToken.getException() != null) { 137 info("Topics Subscription failed." + subToken.getException()); //$NON-NLS-1$ 138 } 139 } 140 141 info("init end"); 142 143 } catch (MqttException e) { 144 // TODO Auto-generated catch block 145 e.printStackTrace(); 146 } 147 148 } 149 150 //String clientId, String topic, String message 151 public void send() { 152 String topic; 153 String message; 154 info("===Send Message start.==="); 155 message = "Hello, boy."; 156 157 158 boolean isConnected = client.isConnected(); 159 if (!isConnected) { 160 //no need. it will auto reconnect and send. 161 } 162 163 // Publish Message 164 try { 165 pubToken = client.publish(topic, new MqttMessage(message.getBytes())); 166 167 info("Publish attempted: isComplete:" + pubToken.isComplete()); 168 169 pubToken.waitForCompletion(); 170 } catch (MqttPersistenceException e) { 171 // TODO Auto-generated catch block 172 e.printStackTrace(); 173 } catch (MqttException e) { 174 // TODO Auto-generated catch block 175 e.printStackTrace(); 176 } 177 178 // Check that Message has been delivered 179 info("Message Delivered: " + pubToken.isComplete()); 180 info("=== send end.===="); 181 } 182 183 void disconnect() { 184 IMqttToken disconnectToken; 185 try { 186 disconnectToken = client.disconnect(); 187 disconnectToken.waitForCompletion(); 188 client.close(); 189 } catch (MqttException e) { 190 // TODO Auto-generated catch block 191 e.printStackTrace(); 192 } 193 client = null; 194 } 195 196 void info(String s) { 197 System.out.println(s); 198 } 199 200 public void connectionLost(Throwable thrwbl) { 201 // TODO Auto-generated method stub 202 info("connectionLost"); 203 204 info("MQTT is disconnected from topic: {}. Message: {}. Cause: {}" + topic + thrwbl.getMessage() + thrwbl.getCause().getMessage()); 205 thrwbl.printStackTrace(); 206 207 } 208 209 public void deliveryComplete(IMqttDeliveryToken arg0) { 210 // TODO Auto-generated method stub 211 info("deliveryComplete"); 212 213 } 214 215 public void messageArrived(String arg0, MqttMessage arg1) throws Exception { 216 // TODO Auto-generated method stub 217 String message = new String(arg1.getPayload()); 218 String topic = arg0; 219 220 info("xxx Receive : topic=" + topic + "; message=" + message); 221 222 } 223 }

1 <project xmlns="" xmlns:xsi="" 2 xsi:schemaLocation=""> 3 <modelVersion>4.0.0</modelVersion> 4 5 <groupId>com.italktv.mqtt.client</groupId> 6 <artifactId>mqttclient</artifactId> 7 <version>0.0.1-SNAPSHOT</version> 8 <packaging>jar</packaging> 9 10 <name>mqttclient</name> 11 <url></url> 12 13 <properties> 14 <>UTF-8</> 15 </properties> 16 17 <dependencies> 18 <dependency> 19 <groupId>junit</groupId> 20 <artifactId>junit</artifactId> 21 <version>3.8.1</version> 22 <scope>test</scope> 23 </dependency> 24 25 26 <dependency> 27 <groupId>org.eclipse.paho</groupId> 28 <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 29 <version>1.1.0</version> 30 </dependency> 31 32 </dependencies> 33 </project>
