物联网数据传输协议MQTT介绍与应用开发详解
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了物联网数据传输协议MQTT介绍与应用开发详解相关的知识,希望对你有一定的参考价值。
本文首发微信公众号:码上观世界
Part 1 物联网概述
1. 物联网概念
物联网是指通过各种信息传感器、射频识别技术、全球定位系统、红外感应器、激光扫描器等各种装置与技术,实时采集任何需要监控、 连接、互动的物体或过程,采集其声、光、热、电、力学、化 学、生物、位置等各种需要的信息,通过各类可能的网络接入,实现物与物、物与人的泛在连接,实现对物品和过程的智能化感知、识别和管理。
物联网是一个基于互联网、传统电信网等的信息承载体,它让所有能够被独立寻址的普通物理对象形成互联互通的网络。
2. 物联网发展的问题与瓶颈
(1)技术标准的统一与协调。目前,传统互联网的标准并不适合物联网。物联网感知层的数据多源异构,不同的设备有不同的接口,不同的技术标准;网络层、应用层也由于使用的网络类型不同、行业的应用方向不同而存在不同的网络协议和体系结构。建立的统一的物联网体系架构,统一的技术标准是物联网现在正在面对的难题
(2)碎片化。由于终端设备的多样化,物联网的开发和应用存在较严重的碎片化问题,主要体现在以下几个方面:
电气接口的碎片化。传统电气接口分为模拟信号、数字信号的传输模式,数字信号又有无数种通讯协议,所以电器接口访问问题给处理器编程带来了繁重任务,集成化的通讯模块的电气接口也同样存在问题;
终端传感器的访问协议碎片化。每个传感器的配置、访问方式和通讯协议均不同,也需要进行重复的编程和配置;
终端通讯接入方式碎片化。物联网终端可能是有线网络接入或者总线接入,也可能无线网络接入,而无线网络接入协议众多,如近距离的蓝牙、超宽带,中距离的zigbee WIFI,还有传统广域的2G、4G接入以及近年来兴起的Lora、NB-IoT等;
处理器碎片化。处理器纷繁复杂,对应不同的处理器,需要进行不同的板级配置,没有统一标准的板级硬件;
物联网平台的碎片化。由于物联网到终端的通讯协议没有统一,物联网平台传输协议各不相同,传入不同的云服务器需要进行重复的编程工作。
3. 物联网通讯协议
在物联网应用中,通信技术包括Wi-Fi、RFID、NFC、ZigBee、Bluetooth、LoRa、NB-IoT、GSM、GPRS、3/4/5G网络、Ethernet、RS232、RS485、USB等。物联网技术框架体系中所使用到的通讯协议主要有:AMQP、JMS、REST、HTTP/HTTPS、COAP、DDS 、MQTT等。
由于物联网设备受限于工作环境和其特殊用途,使其应用有特别的要求,比如因为需要大量部署终端,对设备价格和维护的成本要求不能过高,终端设备本身资源占用极低,对网络带宽、数据传输大小、电源消耗都要求苛刻,另外安全性也是物联网设备的重要考虑方面,因此为了满足物联网设备的工作要求,除了在硬件上保证设备的低功耗、安全与轻量,在软件上也同样需要新的网络通信协议和安全保障。
根据Eclipse 基金会的调查,MQTT是物联网解决方案中最常用的消息传递协议,详情见下图:
其中MQTT协议是20世纪90年代中期,IBM为帮助石油和天然气公司客户实现数千英里长的石油和天然气管道的无人值守监控,将管道上的传感器数据通过卫星通信传输到监控中心而设计的数据传输协议。
当时设计数据传输协议面临的场景和需要解决的问题是:
石油天然气管道线路非常长,要接许多沿线的数据采集网关,服务器要能接成千上万个通信客户端;
石油管道传感器的数据采集频率不高,而且每次传输数据量不大,不需要传输大量数据;
现场采集网关由于量大,考虑到采购成本,CPU和存储等计算资源都很有限,协议客户端软件要能在CPU和存储等计算资源都很有限的单片机、单板机、RTU等上运行,并能方便的实现移植到不同的硬件上;
通信流量费用高昂,需要最大限度地减少带宽和传输消息大小,石油管道会穿越很多无人区,附近没有网络设施,因此只能使用卫星通讯,但卫星链路带宽低(当然也有高带宽的),通信流量费用高昂,因此需要尽量节省传输的消息的流量开销;
高轨道的GEO卫星站得高看得远,覆盖范围广,但轨道高延迟就大了。中低轨道的LEO/MEO卫星延迟小,但是覆盖区域有限,每天都会出现卫星切换时的网络中断。因此需要客户端和服务器端都能够保留消息收发状态,在网络恢复正常后继续发送;
有些数据发送失败,不需要重发。但是有些消息比如阀门泄露告警或控制石油管道阀门的命令,就必须要在网络有问题的情况下也要能确保发送成功。因此需要在环境允许的情况下,提供不同等级的“服务质量”。
经过10多年的发展,MQTT协议v3.1.1版已于2014年7月被OASIS组织接纳为国际标准。除了MQTT协议外,还有其他常用协议,比如CoAP、WebSocket和HTTP。因为HTTP协议较重,本文不做介绍,这里只简单比较下MQTT跟CoAP和WebSocket的区别。
MQTT基于TCP的长连接,Clients必须要知道消息格式才能通信,是多个客户端通过中央代理进行消息传递的多对多协议。CoAP(受限制的应用协议,Constrained Application Protocol)基于UDP的无连接,采用与HTTP协议相同的请求响应工作模式,CoAP内置发现支持和内容协商,能允许设备相互窥测以找到数据交换的方式,是一个在Server和Client之间传递状态信息的单对单协议。
WebSocket最初是为浏览器和服务器之间的点对点连接进行全双工通信而开发的,它可以用于嵌入式设备的连接,也可以说是物联网设备,但不能很好地保持信号。MQTT是专门为IoT设备设计的,其设计原则是最大程度地减少网络带宽并确保交付。MQTT在基本消息发送机制的基础上增加了额外的抽象,以便多个感兴趣的机器可以订阅他们感兴趣的主题。因此,有时可以按主题路由消息,以便多台机器可以共享一个共同的兴趣,服务器可以选择按主题过滤消息,但可以接收所有消息。因此,在嵌入式系统的上下文中,MQTT比WebSocket更适合,因为除了通信开销小外,MQTT在协议级别提供pub / sub,而且提供不同等级的服务质量。
上面提到的协议是处于网络协议层次的应用层,而工作在物理层的NB-IoT协议是不得不提的另一种重要的物联网通信协议。
在低功耗广域网络(Low Power Wide Area Network,简写:LPWAN) 中,实现远距离低功耗的无线通信网络技术主要包含NB-IoT、LoRa、Sigfox、eMTC四种,其中NB-IoT、LoRa近年来备受市场的关注和追捧,尤其是NB-IoT(Narrow Band-Internet of Things,窄带物联网)。NB-IoT基于蜂窝网络,可直接部署于GSM网络、UMTS网络或LTE网络,部署成本低;它的传输距离可达十几公里,每个网络单元可以支持接入50,000个设备终端。速度方面,NB-IoT的频射网络带宽为200kHz。传输速率一般在160kbps-250kbps之间;又由于NB-IoT通讯结构及设备本身的数据接收及上报模式的设置等因素,NB-IoT通讯常会具有10s内的传输延时。
NB-IoT的网络部署包含芯片、模组或终端,NB-IoT基站、NB-IoT核心网、IoT连接管理平台等部分:
NB-IoT网络通讯比较适合于终端较为分散、环境网络信号不好、没有稳定电源、传输速率要求相对较高、中&高频上行数据传输等情况.尽管因为功耗、网络覆盖范围有限以及商业模式等因素导致NB-IoT目前的市场份额相对较少,但是随着NB-IoT被纳入5G候选技术方案,NB-IoT将成为未来5G物联网主流技术,NB-IoT也将迎来新的发展机遇。现阶段,NB-IoT垂直应用行业主要集中交通行业、物流行业、卫生医疗、商品零售行业、智能抄表、公共设施、智能家居、智能农业、工业制造、企业能耗管理、企业安全防护,这些需求都成为了NB-IoT增长的市场。数据显示:NB-IoT最早商用“三表”(电表、气表、水表)领域已经在2018年实现了百万量级的出货。
本文接下来着重介绍应用层的MQTT协议及其应用开发。
Part 2 MQTT协议介绍
4 MQTT协议组成
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。
MQTT协议实现需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器。
客户端作为一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接,它可以:
(1)发布其他客户端可能会订阅的信息。
(2)订阅其它客户端发布的消息。
(3)退订或删除应用程序的消息。
(4)断开与服务器连接。
MQTT服务器也称为“消息代理”(Broker),位于消息发布者和订阅者之间,它可以:
(1)接受来自客户的网络连接;
(2)接受客户发布的应用信息;
(3)处理来自客户端的订阅和退订请求;
(4)向订阅的客户转发应用程序消息。
5 MQTT协议中核心概念
(1)会话(Session)与订阅(Subscription)
每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
(2)主题名(Topic Name)和主题筛选器(Topic Filter)
主题名是消息的标签,主题筛选器是订阅者指定的标签通配符表达式。一个主题筛选器可以匹配到多个主题名称,服务器会将订阅所匹配到标签下的所有消息发送给订阅者。MQTT 的 主题名有层级结构,主题筛选器支持通配符 + 和 #:
MQTT 的主题是不要预先创建的,发布者发送消息到某个主题、或者订阅者订阅某个主题的时候,Broker 就会自动创建这个主题。
(3)消息
MQTT协议消息数据包由固定头、可变头、消息体三部分组成:
固定头(Fixed header),共2个字节,第一个字节的高4位定义了此消息的类型,第一个字节的低4位根据消息的类型,含有不同的含义。第二个字节声明接下来的可变头及负载的数据长度,固定头结构如下:
其中第一个字节的4-7bit为数据包消息类型,有14种:
可变头(Variable header),存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。可变头位于固定头和负载之间,其内容因数据包类型而不同,常用于包的标识:
消息体(Payload),存在于部分MQTT数据包中,表示客户端收到的具体内容,包括CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息。PUBLISH消息体内容可选,其他类型都没有消息体。
MQTT中有两种特殊的消息:Retained 消息和LWT 遗嘱消息,分别介绍如下:
Retained 消息是指在 PUBLISH 数据包中 Retain 标识设为 1 的消息,Broker 收到这样的 PUBLISH 包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker 会马上将这个消息发送给订阅者。Retain 消息用于解决新的订阅开始时能够接收订阅之前的消息,有以下一些特点:
一个 Topic 只能有 1 条 Retained 消息,发布新的 Retained 消息将覆盖老的 Retained 消息;
如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的 Retained 消息;
只有新的订阅者才会收到 Retained 消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到 Retained 消息;
Retained 消息发送到订阅者时,消息的 Retain 标识仍然是 1,订阅者可以判断这个消息是否是 Retained 消息,以做相应的处理。
Retained 消息和持久性会话没有任何关系,Retained 消息是 Broker 为每一个 Topic 单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的。
遗嘱常用于获取设备的连接状态。当Client 非正常断开连接,将发送遗嘱消息给订阅者,Broker 在以下情况下认为 Client 是非正常断开连接的:
Broker 检测到底层的 I/O 异常;
Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;
Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;
Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包。
如果 Client 通过发布 DISCONNECT 数据包断开连接,这个属于正常断开连接,不会触发 LWT 的机制,同时,Broker 还会丢弃掉这个 Client 在连接时指定的 LWT 参数。
(4) 服务质量 QoS
QoS在设置上由2 位 的二进制控制,且值不允许为 3(0x11):
要注意的是,QoS 是 Sender 和 Receiver 之间达成的协议,不是 Publisher 和 Subscriber 之间达成的协议。比如Publisher 发布一条 QoS1 的消息,只能保证 Broker 能至少收到一次这个消息;至于对应的 Subscriber 能否至少收到一次这个消息,还要取决于 Subscriber 在 Subscribe 的时候和 Broker 协商的 QoS 等级。
另外,在 MQTT 协议中,从 Broker 到 Subscriber 这段消息传递的实际 QoS 等于 "Publisher 发布消息时指定的 QoS 等级和 Subscriber 在订阅时与 Broker 协商的 QoS 等级,这两个 QoS 等级中的最小那一个。"这就是所谓QoS 降级。
QoS 0 :ender 不关心 Receiver 是否收到消息,它"尽力"发送消息。
QoS 1:Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息。
1)Sender 向 Receiver 发送一个带有消息数据的 PUBLISH 包, 并在本地保存这个 PUBLISH 包。
2)Receiver 收到 PUBLISH 包以后,向 Sender 发送一个 PUBACK 数据包,PUBACK 数据包没有消息体(Payload),在可变头中(Variable header)中有一个包标识(Packet Identifier),和它收到的 PUBLISH 包中的 Packet Identifier 一致。
3)Sender 收到 PUBACK 之后,根据 PUBACK 包中的 Packet Identifier 找到本地保存的 PUBLISH 包,然后丢弃掉,一次消息的发送完成。
4)如果 Sender 在一段时间内没有收到 PUBLISH 包对应的 PUBACK,它将该 PUBLISH 包的 DUP 标识设为 1(代表是重新发送的 PUBLISH 包),然后重新发送该 PUBLISH 包。重复这个流程,直到收到 PUBACK,然后执行第 3 步。
QoS 2:Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
QoS 2 使用 2 套请求/应答流程(一个 4 段的握手)来确保 Receiver 收到来自 Sender 的消息,且不重复:
1)Sender 发送 QoS 为 2 的 PUBLISH 数据包,数据包 Packet Identifier 为 P,并在本地保存该 PUBLISH 包;
2)Receiver 收到 PUBLISH 数据包以后,在本地保存 PUBLISH 包的 Packet Identifier P,并回复 Sender 一个 PUBREC 数据包,PUBREC 数据包可变头中的 Packet Identifier 为 P,没有消息体(Payload);
3)当 Sender 收到 PUBREC,它就可以安全地丢弃掉初始的 Packet Identifier 为 P 的 PUBLISH 数据包,同时保存该 PUBREC 数据包,同时回复 Receiver 一个 PUBREL 数据包,PUBREL 数据包可变头中的 Packet Identifier 为 P,没有消息体;如果 Sender 在一定时间内没有收到 PUBREC,它会把 PUBLISH 包的 DUP 标识设为 1,重新发送该 PUBLISH 数据包(Payload);
4)当 Receiver 收到 PUBREL 数据包,它可以丢弃掉保存的 PUBLISH 包的 Packet Identifier P,并回复 Sender 一个 PUBCOMP 数据包,PUBCOMP 数据包可变头中的 Packet Identifier 为 P,没有消息体(Payload);
5)当 Sender 收到 PUBCOMP 包,那么它认为数据包传输已完成,它会丢弃掉对应的 PUBREC 包。如果 Sender 在一定时间内没有收到 PUBCOMP 包,它会重新发送 PUBREL 数据包。
我们可以看到在 QoS2 中,完成一次消息的传递,Sender 和 Reciever 之间至少要发送四个数据包,QoS2 是最安全也是最慢的一种 QoS 等级了。
6 MQTT协议格式举例
(1)CONNECT - 连接请求报文
客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT, 连接服务端报文。其格式如下图:
在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接。CONNECT报文头第一个字节固定为0x01,代表CONNECT报文,第二个字节代表余下的数据包字节长度。可变报头按下列次序包含四个字段:
协议名(Protocol Name),值固定为字符 “MQTT”的UTF-8编码的字符串,MQTT规范的后续版本不会改变这个字符串的偏移和长度。占用6个字节。
协议级别(Protocol Level),对 MQTT 3.1.1 来说,值为 4,占用1个字节。
对于3.1.1版协议,协议级别字段的值是4(0x04)。如果发现不支持的协议级别,服务端必须给发送一个返回码为0x01(不支持的协议级别)的CONNACK报文响应CONNECT报文,然后断开客户端的连接。
连接标志(Connect Flags):连接标志字节包含一些用于指定MQTT连接行为的参数。它还指出有效载荷中的字段是否存在。byte8[0]必须为0,否则断开连接。
用户名标识(User Name Flag):消息体中是1否0有用户名字段。
密码标识(Password Flag):消息体中是1否0有密码字段。
遗嘱消息 Retain 标识(Will Retain):标识遗嘱消息是1否0是 Retain 消息。如果 遗嘱标识 被设置为0,遗嘱保留(Will Retain)标志也必须设置为0。如果遗嘱标志被设置为1:
如果遗嘱保留被设置为0,服务端必须将遗嘱消息当作非保留消息发布。
如果遗嘱保留被设置为1,服务端必须将遗嘱消息当作保留消息发布。
遗嘱消息 QOS 标识(Will Qos):标识遗嘱消息的 Qos,2bit。如果遗嘱标志被设置为0,遗嘱QoS也必须设置为0(0x00)。
遗嘱标识(Will Flag):标识是1否0使用遗嘱消息。
会话清除标识(Clean Session):标识 Client 是0否1建立一个持久化的会话。当 Clean Session 的标识设为 0 时,代表 Client 希望建立一个持久会话的连接,Broker 将存储该 Client 订阅的主题和未接受的消息,否则(设置为1) Broker 不会存储这些数据,同时在建立连接时清除这个 Client 之前存在的持久化会话所保存的数据。持久会话只在 QoS 等级 大于等于 1 的消息中有效。
连接保活(Keep Alive): 设置一个单位为秒的时间间隔,指在 client 传输完成一个控制报文的时刻到发送下一个报文的时刻,client 与 broker 两者之间允许空闲的最大时间间隔(单位:秒),MQTT 协议中约定:在 1.5*Keep Alive 的时间间隔内,如果 Broker 没有收到来自 Client 的任何数据包,那么 Broker 认为它和 Client 之间的连接已经断开;同样地, 如果 Client 没有收到来自 Broker 的任何数据包,那么 Client 认为它和 Broker 之间的连接已经断开。MQTT 还有一对 PINGREQ/PINGRESP 数据包,当 Broker 和 Client 之间没有任何数据包传输的时候,可以通过 PINGREQ/PINGRESP 来满足 Keep Alive 的约定和侦测连接状态。
CONNECT的payload字段是根据可变报头的连接标志决定是否存在,如果存在,必须按照客户端标识符(client ID)、遗嘱主题(will topic)、用户名(user name)和密码(password)的顺序出现,其中客户端标识符(client ID)必须存在而且必须是CONNECT报文有效载荷的第1个字段,必须用UTF-8进行编码。
服务端使用客户端标识符 (Client Id) 识别客户端。连接服务端的每个客户端都有唯一的客户端标识符(Client Id)。遗嘱主题(will topic)、用户名(user name)和密码(password)这三个字段分别由一个两字节的长度和消息的有效载荷组成,表示为零字节或多个字节序列。长度给出了跟在后面的数据的字节数,不包含长度字段本身占用的两个字节。
(2)CONNACK – 确认连接请求报文
服务端发送CONNACK报文响应从客户端收到的CONNECT报文。服务端发送给客户端的第一个报文必须是CONNACK。如果客户端在合理的时间内没有收到服务端的CONNACK报文,客户端应该关闭网络连接。报文格式如下图:
除了固定头,可变报文图中有两个字段:连接确认标志 (Connect Acknowledge Flags)和连接返回码 (Connect Return code)。
连接确认标志 Connect Acknowledge Flags 只用了第0位,第0 (也叫 SP)位是当前会话(Session Present)标志。如果服务端收到清理会话(Clean Session)标志为1的连接,除了将CONNACK报文中的返回码设置为0之外,还必须将CONNACK报文中的当前会话设置(Session Present)标志为0 。
如果服务端收到一个Clean Session为0的连接,当前会话标志的值取决于服务端是否已经保存了ClientId对应客户端的会话状态。
如果服务端已经保存了会话状态,它必须将CONNACK报文中的SP设置为1。
如果服务端没有已保存的会话状态,它必须将CONNACK报文中的SP设置为0;还需要将CONNACK报文中的返回码设置为0。(此时代表连接成功)
SP使服务端和客户端在是否有已存储的会话状态上保持一致。
一旦完成了会话的初始化设置,已经保存会话状态的客户端将期望服务端维持它存储的会话状态。如果客户端从服务端收到的当前的值与预期的不同,客户端可以选择继续这个会话或者断开连接。客户端可以丢弃客户端和服务端之间的会话状态,方法是,断开连接,将清理会话标志设置为1,再次连接,然后再次断开连接。
如果服务端发送了一个包含非零返回码的CONNACK报文,它必须将当前会话标志设置为0。
连接返回码 Connect Return code 使用一个字节的无符号值,在下表中列出:
(3)PUBLISH – 发布消息报文
PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。客户端使用PUBLISH报文发送应用消息给服务端,目的是分发到其它订阅匹配的客户端。服务端使用PUBLISH报文发送应用消息给每一个订阅匹配的客户端。其格式如下图:
第一个字节为固定头,其中高4位为消息类型(0x03),低4位有3个重要标志位:
重发标志 DUP:如果DUP标志被设置为0:表示这是客户端或服务端第一次请求发送这个PUBLISH报文。对于QoS 0的消息,DUP标志必须设置为0。如果DUP标志被设置为1:表示这可能是一个早前报文请求的重发。客户端或服务端请求重发一个PUBLISH报文时,必须将DUP标志设置为1。
服务质量等级 QoS:PUBLISH报文不能将QoS所有的位设置为1。如果服务端或客户端收到QoS所有位都为1的PUBLISH报文,它必须关闭网络连接。当QoS设置为1时,客户端或服务器发布消息时,需要得到对方的确认(PUBACK),如果一段时间后没收到PUBACK,那么会再次发送当前消息,并将DUP字段标记为1。
保留标志 RETAIN:如果服务端收到一条保留(RETAIN)标志为1的QoS 0消息,它必须丢弃之前为那个主题保留的任何消息。它应该将这个新的QoS 0消息当作那个主题的新保留消息,但是任何时候都可以选择丢弃它 — 如果这种情况发生了,那个主题将没有保留消息。
服务端发送PUBLISH报文给客户端时,如果消息是作为客户端一个新订阅的结果发送,它必须将报文的保留标志设为1 [MQTT-3.3.1-8]。当一个PUBLISH报文发送给客户端是因为匹配一个已建立的订阅时,服务端必须将保留标志设为0,不管它收到的这个消息中保留标志的值是多少。
保留标志为1且有效载荷为零字节的PUBLISH报文会被服务端当作正常消息处理,它会被发送给订阅主题匹配的客户端。此外,同一个主题下任何现存的保留消息必须被移除,因此这个主题之后的任何订阅者都不会收到一个保留消息。
PUBLISH 的可变头
可变报头按顺序包含主题名(Topic Name)和报文标识符(Packet Identifier)。
主题名(Topic Name)用于识别有效载荷数据应该被发布到哪一个信息通道。服务端发送给订阅客户端的PUBLISH报文的主题名必须匹配该订阅的主题过滤器。
报文标识符 Packet Identifier:只有当QoS等级是1或2时,报文标识符(Packet Identifier)字段才能出现在PUBLISH报文中。QoS等于0的PUBLISH报文不能包含报文标识符。
报文标识符用来区分报文,特别是在重发的报文中用来标识是否是同一个报文,并在需要应答的场景中用于确定是对哪个发送报文的应答。可变报头的报文标识符(Packet Identifier)字段存在于在多个类型的报文里(占用2个字节)。
PUBLISH 报文的接收者必须按照根据PUBLISH报文中的QoS等级发送响应,响应报文参见QoS流程。
(4)SUBSCRIBE 订阅报文
客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅。每个订阅注册客户端关心的一个或多个主题。为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端。SUBSCRIBE报文格式如下图:
SUBSCRIBE报文的有效载荷必须包含至少一对主题过滤器 和 QoS等级字段组合。每一个过滤器后面跟着一个字节,这个字节被叫做 服务质量要求(Requested QoS)。它给出了服务端向客户端发送应用消息所允许的最大QoS等级。请求的最大服务质量等级QoS:字段编码为一个字节,主题过滤器 和 QoS等级组合是连续地打包。
如果服务端收到一个SUBSCRIBE报文,报文的主题过滤器与一个现存订阅的主题过滤器相同,那么必须使用新的订阅彻底替换现存的订阅。新订阅的主题过滤器和之前订阅的相同,但是它的最大QoS值可以不同。与这个主题过滤器匹配的任何现存的保留消息必须被重发,但是发布流程不能中断。
如果主题过滤器不同于任何现存订阅的过滤器,服务端会创建一个新的订阅并发送所有匹配的保留消息。
如果服务端收到包含多个主题过滤器的SUBSCRIBE报文,它必须如同收到了一系列的多个SUBSCRIBE报文一样处理那个,除了需要将它们的响应合并到一个单独的SUBACK报文发送 [MQTT-3.8.4-4]。
客户端使用带通配符的主题过滤器请求订阅时,客户端的订阅可能会重复,因此发布的消息可能会匹配多个过滤器。对于这种情况,服务端必须将消息分发给所有订阅匹配的QoS等级最高的客户端。服务端之后可以按照订阅的QoS等级,分发消息的副本给每一个匹配的订阅者。
(5)SUBACK 订阅确认报文
服务端收到客户端发送的一个SUBSCRIBE报文时,必须使用SUBACK报文响应,报文格式如下图:
固定报文头为MQTT控制报文类型 (0x9),剩余长度字段:等于可变报头的长度加上有效载荷的长度。可变报头包含等待确认的SUBSCRIBE报文的报文标识符,占用2个字节。SUBACK报文必须和等待确认的SUBSCRIBE报文有相同的报文标识符。SUBACK报文包含一个返回码清单,它们指定了SUBSCRIBE请求的每个订阅被授予的最大QoS等级。返回码的顺序必须和SUBSCRIBE报文中主题过滤器的顺序相同。
Part 3 MQTT应用开发
7 MQTT 应用开发详解
MQTT 应用开发涉及到客户端和服务器两部分,客户端主要关注消息的发送和消费,服务器主要关注消息的持久化存储和系统的性能和可靠性。特别是服务器有不少开源系统可用,比如RabbitMQ、ActiveMQ、Mosquitto和moquette等,都可以支持多种不同的协议。另外IOTDB基于moquette也支持MQTT 应用。
本文介绍基于moquette和RabbitMQ如何开发MQTT应用程序。
(1)基于moquette开发服务器
moquette基于Netty实现了MQTT相关协议,提供了独立部署方式和嵌入式方式,便于测试和研究,这里介绍嵌入式开发方式。
嵌入式开发只需要引入以下包即可:
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.13</version>
</dependency>
开发服务器需要实例化io.moquette.broker.Server,并设置对应的Hander和配置文件,通过startServer方法启动服务器。
实现Handler只需要继承AbstractInterceptHandler,并重写onPublish方法,这样就可以处理客户端发送的消息,比如这里简单的打印消息结构的头信息:
(2)基于RabbitMQ开发客户端应用程序
基于RabbitMQ省去了服务器的开发工作,将重心放在消息的生产和消费。这里使用到Eclipse Paho Java Client库,其提供了MQTT的同步和异步调用的API。使用客户端库需要引入依赖包:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
生产端和客户端开发都需要首先实例化MqttClient和连接配置信息,代码如下:
mqttClient = new MqttClient(host, clientId, new MemoryPersistence())
//配置连接的选项,MqttConnectOptions包含控制客户端连接到服务器的方式的选项。
MqttConnectOptions options = new MqttConnectOptions();
//设置连接用户名和密码
options.setUserName(userName);
options.setPassword(password.toCharArray());
//设置超时时间
options.setConnectionTimeout(3);
//设置心跳时间间隔
options.setKeepAliveInterval(3);
//设置服务器是否应该记住重新连接时客户端的状态
options.setCleanSession(true);
mqttClient.connect(options);
//设置消息发送后的回调方法
mqttClient.setCallback(new MqttCallbackImpl());
//通过字符串获取MqttTopic类型的主题
topic = mqttClient.getTopic(t);
其中host连接地址采用tcp://host:port格式。如果是生产端,就可以通过如下方式发送消息:
MqttMessage message = new MqttMessage();
message.setQos(0);
message.setPayload("hello MQTT from MQTTPublisher!".getBytes());
opic.publish(message);
如果是消费端,首先通过
mqttClient.connect(options);
建立连接,再订阅topic:
String[] topic = {ConfigInfo.topic};
int qos[] = {2};
mqttClient.subscribe(topic, qos);
其中topic和qos都是数组形式,分别表示订阅的topic过滤器和服务质量等级。
最后实现回调接口MqttCallback,该接口提供了3个接口:
connectionLost:当连接丢失触发该方法调用
messageArrived:当消息达到订阅客户端触发该方法调用,该方法被客户端同步调用,并且正常执行完会发送消息确认,如果方法实现抛出异常会导致客户端异常退出。
deliveryComplete:当消息投递完成,并且收到所有消息确认,触发该方法调用。具体来说,如果服务质量是QoS0,消息发送到网络即触发方法,如果是QoS1,当收到PUBACK报文即触发方法,如果是QoS2,当收到PUBCOMP报文即触发方法。
这里实现简单的打印信息:
8 参考资料
https://www.cnblogs.com/schips/p/12267372.html
http://www.lpwap.com/lora-university-case/201710142310/
https://www.rabbitmq.com/web-mqtt.html
https://github.com/moquette-io/moquette
https://www.eclipse.org/paho/index.php?page=clients/java/index.php
本文提供pdf版本,用于后续更新,回复关键字 “MQTT介绍与应用开发” 获取后续更新。更多文章,欢迎扫码关注:
以上是关于物联网数据传输协议MQTT介绍与应用开发详解的主要内容,如果未能解决你的问题,请参考以下文章