MQTT 学习笔记以及在项目中的实际运用
Posted 小羊子说
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT 学习笔记以及在项目中的实际运用相关的知识,希望对你有一定的参考价值。
文章目录
前言
最近工作中有用到MQTT 协议,在此做一个研究学习总结和采坑指南。如果你在工作中也有用到,欢迎交流。文中如有不足之处,欢迎指正。
1. 什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级协议,该协议构建于TCP/IP协议之上,MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
小结:MQTT 已经成为物联网系统事实上的网络协议标准。如果你想从事物联网开发,就一定要掌握 MQTT。
2. 感受MQTT的使用
安装 MQTT,可以直接找到 hbmqtt (python语言)这样的项目拿来用。它的背后有 Eclipse 基金会的支持。
类似的 MQTT Broker 软件,你还可以选择基于 C 语言的Mosquitto,基于 Erlang 语言的VerneMQ等。
至于 MQTT 的客户端(Client)实现,也有成熟的 Python、C、Java 和 javascript 等各种编程语言的开源实现,供你参考、使用,比如Eclipse Paho 项目。
而且,还有很多商业公司在持续运营功能更丰富、支持更完备的商业版 Broker 实现,比如提供高并发能力的集群特性、方便拓展的插件机制等。这些会大大提高我们技术开发者的工作效率。比如中国一个团队开发、维护的 EMQ X,它已经完整地支持 MQTT5.0 协议。
测试工具推荐:MQTT X 可以在线测试或下载本地客户端进行mqtt连接测试和事件的主题订阅、消息发送等。
有助于定位问题,强烈推荐。
另外,生态完善还有一个好处,那就是作为开发者,当你遇到难题时,可以很方便地找到很多相关的资料;就算资料解决不了问题,你还可以去社区中提问,寻求高手的帮助。这在实际工作中非常有用。顺便提一下,除了老牌的 Stack Overflow,你还可以关注一下 GitHub 的 Issues 模块,因为在那里可以找到很多专家。
小结:做为android开发者,我已经找出了众多开源项目的官方开源地址,如想快速体验,请移步:
在采坑中发现,有些问题是存在的,比如Andorid Q之后的前台服务等问题,有些fork分支解决了部分问题,如果你在项目中也遇到了相似问题,先看看issus中的解决方案,甚至可以直接使用fork分支上的开源项目。
小结: mqttt 使用上手是非常快的,测试和工具使用可以很快感觉mqtt的流程,体验后可以根据项目的技术指标在研究下第三方如阿里云mqtt自己搭建服务器时性能指标。然后再根据业务需求,选择第三方平台和自己搭建服务器。
3. MQTT的生态相对完善
当然,阿里云、华为云、腾讯云和微软 Azure 这些大厂,之所以不约而同地选择 MQTT 协议作为物联网设备的“第一语言”,不仅是因为 MQTT 的生态完善,MQTT 协议本身的优秀设计也是重要的因素。
它在设计上的优点体现在哪呢?我想主要有五个方面:
-
契合物联网大部分应用场景的发布 - 订阅模式。
-
能够满足物联网中资源受限设备需要的轻量级特性。
-
时刻关注物联网设备低功耗需求的优化设计。
-
针对物联网中多变的网络环境提供的多种服务质量等级。
-
支持在物联网应用中越来越被重视的数据安全。
3.1 发布 - 订阅模式
mqtt采用了发布 - 订阅模式,MQTT 协议具有很多优点,比如能让一个传感器数据触发一系列动作;网络不稳定造成的临时离线不会影响工作;方便根据需求动态调整系统规模等。这使得它能满足绝大部分物联网场景的需求。
3.2 轻量级协议:减少传输数据量
MQTT 是一个轻量级的网络协议,这一点也是它在物联网系统中流行的重要原因。毕竟物联网中大量的都是计算资源有限、网络带宽低的设备。
这种“轻量级”体现在两个方面。一方面,MQTT 消息采用二进制的编码格式,而不是 HTTP 协议那样的文本的表述方式。
这有什么好处呢?那就是可以充分利用字节位,协议头可以很紧凑,从而尽量减少需要通过网络传输的数据量。
比如,我们分析 HTTP 的一个请求抓包,它的消息内容是下面这样的(注意:空格和回车、换行符都是消息的组成部分):
GET /account HTTP/1.1 <--注释:HTTP请求行
Host: time.geekbang.com <--注释:以下为HTTP请求头部
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:81.0) Gecko/20100101 Firefox/81.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Encoding: gzip, deflate, sdch
Accept-Language: zh-CN,zh;q=0.8,en;q=0.6
<--注释:这个空行是必须的,即使下面的请求体是空的
在 HTTP 协议传输的这段文本中,每个字符都要占用 1 个字节。而如果使用 MQTT 协议,一个字节就可以表示很多内容。下面的图片展示了 MQTT 的固定头的格式,这个固定头只有 2 个字节:
你可以看到,第一个字节分成了高 4 位(4~7)和低 4 位(0~3);低 4 位是数据包标识位,其中的每一比特位又可以表示不同的含义;高 4 位是不同数据包类型的标识位。
第二个字节表示数据包头部和消息体的字节共个数,其中最高位表示有没有第三字节存在,来和第二个字节一起表示字节共个数。
如果有第三个字节,那它的最高位表示是否有第四个字节,来和第二个字节、第三个字节一起表示字节总个数。依此类推,还可能有第四个字节、第五个字节,不过这个表示可变头部和消息体的字节个数的部分,最多也只能到第五个字节,所以可以表示的最大数据包长度有 256MB。
比如,一个请求建立连接的 CONNECT 类型数据包,头部需要 14 个字节;发布消息的 PUBLISH 类型数据包头部只有 2~4 个字节。
轻量级的另一方面,体现在消息的具体交互流程设计非常简单,所以 MQTT 的交互消息类型也非常少。为了方便后面的讲解,我在这里整理了一个表格,总结了 MQTT 不同的数据包类型的功能和发消息的流向。
从表格可以看出,MQTT 3.1.1 版本一共定义了 14 种数据包的类型,在第一个字节的高 4 位中分别对应从 1 到 14 的数值。
3.3 低功耗优化:节约电量和网络资源
除了让协议足够轻量,MQTT 协议还很注重低功耗的优化设计,这主要体现在对能耗和通信次数的优化。
比如,MQTT 协议有一个 Keepalive 机制。它的作用是,在 Client 和 Broker 的连接中断时,让双方能及时发现,并重新建立 MQTT 连接,保证主题消息的可靠传输。
这个机制工作的原理是:Client 和 Broker 都基于 Keepalive 确定的时间长度,来判断一段时间内是否有消息在双方之间传输。这个 Keepalive 时间长度是在 Client 建立连接时设置的,如果超出这个时间长度,双方没有收到新的数据包,那么就判定连接断开。
虽然 Keepalive 要求一段时间内必须有数据包传输,但实际情况是,Client 和 Broker 不可能时时刻刻都在传输主题消息,这要怎么办呢?
MQTT 协议的解决方案是,定义了 PINGREQ 和 PINGRESP 这两种消息类型。它们都没有可变头部和消息体,也就是说都只有 2 个字节大小。Client 和 Broker 通过分别发送 PINGREQ 和 PINGRESP 消息,就能够满足 Keepalive 机制的要求。
嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。
我猜你也想到了,如果要一直这样“傻傻地”定期发送消息,那也太浪费电量和网络资源了。所以,如果在 Keepalive 时间长度内,Client 和 Broker 之间有数据传输,那么 Keepalive 机制也会将其计算在内,这样就不需要再通过发送 PINGREQ 和 PINGRESP 消息来判断了。
除了 Keepalive 机制,MQTT 5.0 中的重复主题特性也能帮助我们节省网络资源。
Client 在重复发送一个主题的消息时,可以从第二次开始,将主题名长度设置为 0,这样 Broker 会自动按照上次的主题来处理消息。这种情况对传感器设备来说十分常见,所以这个特性在工作中很有实际意义。
举例说明:比如发生是主题是“/xxx/iot”,第一次需要在信息体中包含这个主题的字符,第二次就可以省掉这个主题名字了。
3.4 三 种 QoS 级别:可靠通信
除了计算资源有限、网络带宽低,物联网设备还经常遇到网络环境不稳定的问题,尤其是在移动通信、卫星通信这样的场景下。比如共享单车,如果用户已经锁车的这个消息,不能可靠地上传到服务器,那么计费就会出现错误,结果引起用户的抱怨。这样怎么应对呢?
这个问题产生的背景就是不稳定的通信条件,所以 MQTT 协议设计了 3 种不同的 QoS (Quality of Service,服务质量)级别。你可以根据场景灵活选择,在不同环境下保证通信是可靠的。
这 3 种级别分别是:
-
QoS 0,表示消息最多收到一次,即消息可能丢失,但是不会重复。
(注意:网络有很多博客说可能会重复,这里可以抓包确认,有疑问的请务必去测试验证。这里我理解不只管发,不管收,所以会丢失,但是不会重复)
-
QoS 1,表示消息至少收到一次,即消息保证送达,但是可能重复。
-
QoS 2,表示消息只会收到一次,即消息有且只有一次。
图片中展示了qos各自的特点。可以看到,QoS 0 和 QoS 1 的流程相对比较简单;而 QoS 2 为了保证有且只有一次的可靠传输,流程相对复杂些。
正常情况下,QoS 2 有 PUBLISH、PUBREC、PUBREL 和 PUBCOMP 4 次交互。
至于“不正常的情况”,发送方就需要重复发送消息。比如一段时间内没有收到 PUBREC 消息,就需要再次发送 PUBLISH 消息。不过要注意,这时要把消息中的 “重复”标识设置为 1,以便接收方能正确处理。同样地,如果没有收到 PUBCOMP 消息,发送方就需要再次发送 PUBREL 消息。
剖析到这里,MQTT 协议本身的主要特性我就介绍完了,我们已经为在实战篇编写 MQTT 的相关通信代码做好了准备。但是,我还想跟你补充一个跟生产环境有关的知识点,那就是数据安全传输。
3.5 安全传输
说到安全传输,首先我们需要验证 Client 是否有权限接入 MQTT Broker。为了控制 Client 的接入,MQTT 提供了用户名 / 密码的机制。在建立连接过程中,它可以通过判断用户名和密码的正确性,来筛选有效连接请求。
但是光靠这个机制,还不能保证网络通信过程中的数据安全。因为在明文传输的方式下,不止设备数据,甚至用户名和密码都可能被其他人从网络上截获而导致泄漏,于是其他人就可以伪装成合法的设备发送数据。所以,我们还需要通信加密技术的支持。
MQTT 协议支持 SSL/TLS 加密通信方式。采用 SSL/TLS 加密之后,MQTT 将转换为 MQTTS。这有点类似于 HTTP 和 HTTPS 的关系。
小结:
总结一下,在这一讲中,我带你体验了使用 MQTT 协议的通信过程,同时也为你介绍了 MQTT 协议的几个特点。
- MQTT 协议的生态很好。比如,MQTT 协议的代码实现非常丰富,C 语言的有 Mosquitto,Python 语言有 Eclipse hbmqtt,而且有商业公司在运营相关软件解决方案。这表明 MQTT 协议很成熟。
- MQTT 协议采用了适合物联网应用场景的发布 - 订阅模式。当然,我也提到了 MQTT 5.0 中同样增加了请求 - 响应模式,便于部分场景中的开发使用。
- MQTT 协议采用二进制的消息内容编码格式,协议很精简,协议交互也简单,这些特点保证了网络传输流量很小。所以客户端(包括发布者和订阅者角色)的代码实现可以短小精悍,比如 C 语言的实现大概只占 30KB 的存储空间,Java 语言也只需要 100KB 左右大小的代码体积。
- MQTT 协议在设计上考虑了很多物联网设备的低功耗需求,比如 Keepalive 机制中精简的 PINGREQ 和 PINGRESP 这两种消息类型,还有 MQTT 5.0 中新增加的重复主题特性。这也再次印证了 MQTT 的定位非常明确,那就是专注于物联网场景。
- MQTT 在消息的可靠传输和安全性上,也有完整的支持,可以说“简约而不简单”。
4. 实现方式
MQTT协议中,需要两个端,分别是服务端和客户端;有三个身份,分别是发布者(Publish),代理(Broker)、订阅者(Subscribe)。
在MQTT协议中,客户端不能直接对客户端实行端到端的收发消息,必须经过服务端管理分配,所以服务端要运行一个代理服务,也就是三个身份之一的代理身份。消息的发送方称为发布者,该消息的需要接收者称之为订阅者。发布者把消息发送给代理,代理负责检查都谁需要接收这个消息(这个就是订阅),需要的就转发给它。因为要识别不同的消息,所以MQTT协议制定了主题标准,也就是给消息加上了标签。发布者发送的消息总是要带上标签的,代理根据谁订阅了这个标签来决定转发给谁,这个标签就称之为主题(Topic),标签携带我们需要传输的信息内容称之为负载(Payload)。
为了实现异常中断通知机制,所以在客户端与服务端首次连接的时候,就要携带一条相对特殊的主题,主题内容是如果自己掉线了,希望告知需要知道自己掉线一方一些信息,这就是遗嘱消息(Will Message)。这个主题自己在线时,代理不会做任何转发,当自己掉线达到一定时间(即心跳间隔Keep Alive timer),代理会检查谁订阅了这个主题,就转发给谁(当然可以多人订阅),这就实现了异常中断(掉线)通知。
下面继续深入理解一下MQTT协议的工作过程。
发布者、订阅者、代理与主题发布与订阅间的关系:
5. 实现Demo
Talk is cheap,show me code.
根据调研的情况,实现了阿里云mqtt和能用的mqttDemo. 解决了参考其他demo会出现这样或那样的问题, 同时实现了mqtt性能的收发数据时间测试功能,最近会一直维护,欢迎star。
6. FAQ
-
连接失败问题
请检查ClientID中是否是唯一的,一般以时间戳或uuid做为连接的clientID
-
消息重复或丢失
- 请检查qos的设置值。默认值为0,根据业务请检查。
- 连接参数的属性设置
/**
* 生成默认的连接配置
*/
private val generateConnectOptions: MqttConnectOptions by lazy
MqttConnectOptions().apply
// 设置超时时间 单位为秒
connectionTimeout = 8
// 设置会话心跳时间 单位为秒 服务器会每隔60秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
// MQTT 协议中约定:在 1.5*Keep Alive 的时间间隔内,如果 Broker 没有收到来自 Client 的任何数据包,那么 Broker 认为它和 Client 之间的连接已经断开;
// 同样地, 如果 Client 没有收到来自 Broker 的任何数据包,那么 Client 认为它和 Broker 之间的连接已经断开。
keepAliveInterval = 60
isAutomaticReconnect = false
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
// 如果cleanSession设置为false的话,不用每次启动app都订阅,第一次订阅后 后面只执行连接操作即可。这里设置每次连接时都重新订阅一次
isCleanSession = true
userName = mConfig?.getUserName()?.trim()
password = mConfig?.getPassword()?.trim()?.toCharArray()
注意以上代码中 用到了by lazy
, 一旦初始化成功,再次调用时不更新其中上的userName
和password
.
此处在开发中遇到了中这个小坑。如果服务器返回的配置信息,账号密码有变化时,用户在频繁操作过程中,都需要重新 初始化,就不能用by lazy
.
解决办法:
/**
* 生成默认的连接配置
* 账号密码需要同步更新,防止出现无权连接的问题
*/
private fun getMqttConnectOptions(config: MqttConfig): MqttConnectOptions
MqttConnectOptions().apply
// 设置超时时间 单位为秒
connectionTimeout = 8
// 设置会话心跳时间 单位为秒 服务器会每隔60秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
// MQTT 协议中约定:在 1.5*Keep Alive 的时间间隔内,如果 Broker 没有收到来自 Client 的任何数据包,那么 Broker 认为它和 Client 之间的连接已经断开;
// 同样地, 如果 Client 没有收到来自 Broker 的任何数据包,那么 Client 认为它和 Broker 之间的连接已经断开。
keepAliveInterval = 60
isAutomaticReconnect = false
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
// 如果cleanSession设置为false的话,不用每次启动app都订阅,第一次订阅后 后面只执行连接操作即可。这里设置每次连接时都重新订阅一次
isCleanSession = true
userName = config.getUserName().trim()
password = config.getPassword().trim().toCharArray()
return this
在调用connect
时需要将getMqttConnectOptions
中的连接参数进行同步。
-
连接成功,订阅时失败
请检查connect连接的接口是否选择正确,开发中遇到,Android connect中的接口有好几个,注意不要选择错了。
-
混淆问题,官方文档中没有说明哪些类需要混淆,如果打包时添加了混淆,会出现connect fail的问题
no NetworkModule installed for scheme “tcp” of URI "tcp://tt020.qa.fit.com:1883
原因是找不到这个类
解决方案:取消相关mqtt类的混淆
Try to add these lines to your proguard-rules.pro file
-keep class org.eclipse.paho.client.mqttv3.internal.* ;
-keep class org.eclipse.paho.client.mqttv3.spi. *;
解决方案来源:参考连接
7. 出现无权连接的问题 (2020.2.3号新增)
查询很多资料,显示的是解决办法是:
您的应用程序的两个实例都可能尝试使用相同的clientId进行连接.
MQTT协议要求每个客户端连接使用唯一的clientId.
尝试解决未果,不排除这种可能性,但是还有其他原因也会导致“无权连接的问题”。
经过排查,问题出在连接时的账号密码 与实例化的MqttAndroidClient中的serverUri和ClientID不匹配。
如果 重新实例了MqttAndroidClient,连接的参数(账号、密码)也要一一对应上。
8 .连接时出现以下错误:(2021.2.10 新增)
Can’t find bundle for base name org.eclipse.paho.client.mqttv3.internal.nls.logcat, locale zh_CN
未找到原因,clean 项目后再执行又未出现,关注中。
可尝试这样解决:检查build.gradle中的packagingOptions配置。是否有这样的配置:
packagingOptions exclude ‘META-INF/*’ 有就删除了再试。
packagingOptions has any question?
I hava a sample problem,because of we have been set packagingOptions as: packagingOptions exclude ‘META-INF/*’ this setting will to do filter logcat.propties file, you can delete it。
最后,如果你也在使用Mqtt,遇到了什么问题,欢迎留言交流。
参考:
EMQ文档非常完善,推荐一看
转战物联网·基础篇05-通俗理解MQTT协议的实现原理和异步方式
扩展阅读:
可以了解一下sdk的调用及完善,同时在接入mqtt时,可以考虑一下如何封住成sdk,方便复用。
阿里云消息队列MQTT踩坑之路(阿里云MQTT Android客户端)
Android使用Mqtt协议链接ActiveMQ服务器实现推送
以上是关于MQTT 学习笔记以及在项目中的实际运用的主要内容,如果未能解决你的问题,请参考以下文章