MQTT:Massage Queue Telemetry Transport消息队列遥测传输

Posted Kris_u

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT:Massage Queue Telemetry Transport消息队列遥测传输相关的知识,希望对你有一定的参考价值。

MQTT是一个客户端服务端(C/S)架构发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,因此易于实现。适用场景:包括受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT),这些场景要求很小的代码封装或者网络带宽非常昂贵。

本协议运行在TCP/IP,或其它提供了有序、可靠、双向连接的网络连接上。它有以下特点:

  • 使用发布/订阅消息模式,提供了一对多的消息分发和应用之间的解耦。
  • 消息传输不需要知道负载内容。
  • 提供三种等级的服务质量:

    • 最多一次”,尽操作环境所能提供的最大努力分发消息。消息可能会丢失。例如,这个等级可用于环境传感器数据,单次的数据丢失没关系,因为不久之后会再次发送。
    • 至少一次”,保证消息可以到达,但是可能会重复。
    • 仅一次”,保证消息只到达一次。例如,这个等级可用在一个计费系统中,这里如果消息重复或丢失会导致不正确的收费。
  • 很小的传输消耗和协议数据交换,最大限度减少网络流量。
  • 异常连接断开发生时,能通知到相关各方。

应用消息 Application Message MQTT协议通过网络传输应用数据。应用消息通过MQTT传输时,它们有关联的服务质量(QoS)和主题(Topic)

客户端 Client
使用MQTT的程序或设备。客户端总是通过网络连接到服务端。它可以

  • 打开连接到服务端的网络连接
  • 发布应用消息给其它相关的客户端
  • 订阅以请求接受相关的应用消息
  • 取消订阅以移除接受应用消息的请求
  • 关闭连接到服务端的网络连接

服务端 Server
一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。服务端

  • 接受来自客户端的网络连接
  • 接受客户端发布的应用消息
  • 处理客户端的订阅和取消订阅请求
  • 转发应用消息给符合条件的已订阅客户端
  • 关闭来自客户端的网络连接

会话 Subscription
客户端和服务端之间的状态交互。一些会话持续时长与网络连接一样,另一些可以在客户端和服务端的多个连续网络连接间扩展。

订阅 Subscription
订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。会话的每个订阅都有一个不同的主题过滤器。

共享订阅 Shared Subscription
一个共享订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。一个共享订阅可以与多个订阅会话相关联,便于支持大范围消息交换模式。一条主题匹配的应用消息只发送给关联到此共享订阅的多个会话中的一个会话。一个会话可以包括多个共享订阅,可以同时包含共享订阅与非共享订阅。

通配符订阅 Wildcard Subscription
通配符订阅是指主题过滤器(Topic Filter)包含一个或多个通配符的订阅。通配符订阅使得一次订阅匹配多个主题名(Topic Name)

主题名 Topic Name
附加在应用消息上的一个标签,服务端已知且与订阅匹配。服务端发送应用消息的一个副本给每一个匹配的客户端订阅。

主题过滤器 Topic Filter
订阅中包含的一个表达式,用于表示相关的一个或多个主题。主题过滤器可以使用通配符。

MQTT控制报文 MQTT Control Packet
通过网络连接发送的信息数据包。MQTT 规范定义了十四种不同类型的MQTT控制报文,其中一个(PUBLISH 报文)用于传输应用消息

无效报文 Malformed Packet
根据规范不能被正确解析的控制报文

协议错误 Protocol Error
在报文解析之后发现包含协议不允许或与客户端或服务端当前状态不一致的数据的错误。

遗嘱消息 Will Message
在网络连接非正常关闭的情况下,由服务端发布的应用消息。

MQTT控制报文的结构 Structure of an MQTT Control Packet

Fixed Header固定报头,所有控制报文都包含
Variable Header 可变报头,部分控制报文包含
Payload 有效载荷,部分控制报文包含

固定报头的格式 Fixed Header format 

比特位76543210
byte 1MQTT控制报文的类型用于指定控制报文类型的标志位
byte 2...剩余长度

 剩余长度字段
剩余长度字段
剩余长度等于可变报头的长度加上有效载荷的长度。编码方式为变长字节整数。

MQTT控制报文的类型 MQTT Control Packet types 

名字报文流动方向描述
Reserved0禁止保留
CONNECT1客户端到服务端客户端请求连接服务端
CONNACK2服务端到客户端连接报文确认
PUBLISH3两个方向都允许发布消息
PUBACK4两个方向都允许QoS 1消息发布收到确认
PUBREC5两个方向都允许发布收到(保证交付第一步)
PUBREL6两个方向都允许发布释放(保证交付第二步)
PUBCOMP7两个方向都允许QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE8客户端到服务端客户端订阅请求
SUBACK9服务端到客户端订阅请求报文确认
UNSUBSCRIBE10客户端到服务端客户端取消订阅请求
UNSUBACK11服务端到客户端取消订阅报文确认
PINGREQ12客户端到服务端心跳请求
PINGRESP13服务端到客户端心跳响应
DISCONNECT14两个方向都允许断开连接通知
AUTH15两个方向都允许认证信息交换

标志 Flags

 固定报头第1个字节的剩余的4位 [3-0]包含每个 MQTT 控制报文类型特定的标志如下表所示。表格中任何标记为“保留”的标志位,都是保留给以后使用的,必须设置为表格中列出的值 [MQTT-2.1.3-1]。如果收到非法的标志,此报文被当做无效报文。

控制报文固定报头标志Bit 3Bit 2Bit 1Bit 0
CONNECTReserved0000
CONNACKReserved0000
PUBLISHUsed in MQTT v5.0DUPQoSRETAIN
PUBACKReserved0000
PUBRECReserved0000
PUBRELReserved0010
PUBCOMPReserved0000
SUBSCRIBEReserved0010
SUBACKReserved0000
UNSUBSCRIBEReserved0010
UNSUBACKReserved0000
PINGREQReserved0000
PINGRESPReserved0000
DISCONNECTReserved0000
AUTHReserved0000

  • DUP =控制报文的重复分发标志
  • QoS = PUBLISH报文的服务质量等级
  • RETAIN = PUBLISH报文的保留标志 PUBLISH控制报文中的DUP

CONNECT Variable header

CONNECT 报文的可变报头按下列次序包含四个字段:协议名(Protocol Name),协议级别(Protocol Level),连接标志(Connect Flags),保持连接(Keep Alive)和属性(Properties)

CONNACK – 确认连接请求 Connect acknowledgement

CONNACK报文由服务端所发送,作为对来自客户端的CONNECT报文的响应。服务端在发送任何除AUTH以外的报文之前必须先发送包含原因码为0x00(成功)的CONNACK报文 [MQTT-3.2.0-1]。服务端在一次网络连接中不能发送多个CONNACK报文 [MQTT-3.2.0-2]。

如果客户端在合理的时间内没有收到服务端的CONNACK报文,客户端应该关闭网络连接。合理 的时间取决于应用的类型和通信基础设施。

MQTT pub/sub实践:

Eclipse Mosquitto

ubuntu本机安装 mosquitto 服务器:

apt install mosquitto

安装成功后MQTT服务器默认监听的是本地的1883端口,可通过指定配置文件修改listen接受外部服务器的发布订阅服务。

vim mosquitto.conf,向文件中添加如下内容

allow_anonymous true
listener 1883 0.0.0.0

详情参考官方文档:mosquitto.conf man page | Eclipse Mosquitto

 terminal 输入启动命令:

mosquitto -c ./mosquitto.conf //路径可以修改

安装成功后terminal验证发布/订阅功能:

  • -t  指定发布的Topic
  • -p 指定监听的端口号,默认1883,可修改其为他端口号
  • -m 发布的message
  • -h  指定host ip

1、Ctrl+Alt+T 打开一个terminal执行如下命令:

mosquitto_pub -t DataTopic -p 1884 -m "hhhhhhhhhhhhh"

2、新开另一个terminal执行如下命令:

mosquitto_sub -t DataTopic  -p 1884

如果mosquitto安装在远程服务器,增加-h 参数指定远程服务器IP即可:

mosquitto_pub -t DataTopic -h 10.101.12.16 -p 1884 -m "hhhhhhhhhhhhh"

#新开一个teminal
mosquitto_sub -t DataTopic -h 10.101.12.16 -p 1884

Paho provides MQTT client library implementations in a wide variety of languages.

golang版本:Eclipse Paho | The Eclipse Foundation

Example:

package main

import (
	"fmt"
	"os"

    MQTT "github.com/eclipse/paho.mqtt.golang"
)

//define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) 
	fmt.Printf("TOPIC: %s\\n", msg.Topic())
	fmt.Printf("MSG: %s\\n", msg.Payload())


func main() 
	//create a ClientOptions struct setting the broker address, clientid, turn
	//off trace output and set the default message handler
	opts := MQTT.NewClientOptions().AddBroker("tcp://10.101.12.16:1884")
	opts.SetClientID("go-simple")
	opts.SetDefaultPublishHandler(f)

	//create and start a client using the above ClientOptions
	c := MQTT.NewClient(opts)
	if token := c.Connect(); token.Wait() && token.Error() != nil 
		panic(token.Error())
	

	//subscribe to the topic /go-mqtt/sample and request messages to be delivered
	//at a maximum qos of zero, wait for the receipt to confirm the subscription

	if token := c.Subscribe("DataTopic", 0, nil); token.Wait() && token.Error() != nil 
		fmt.Println(token.Error())
		os.Exit(1)
	

	select   //阻塞防止程序退出

	//Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
	//from the server after sending each message
	//for i := 0; i < 5; i++ 
	//	text := fmt.Sprintf("Hello Baby! this is msg #%d!", i)
	//	token := c.Publish("go-mqtt/sample", 0, false, text)
	//	token.Wait()
	//
	//
	unsubscribe from /go-mqtt/sample
	//if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil 
	//	fmt.Println(token.Error())
	//	os.Exit(1)
	//
	//

	//c.Disconnect(250)


以上是关于MQTT:Massage Queue Telemetry Transport消息队列遥测传输的主要内容,如果未能解决你的问题,请参考以下文章

MQTT:Massage Queue Telemetry Transport消息队列遥测传输

django的model的增删改

如何启用 ENTER 按钮发送消息

python之路:发附带文件的邮件

Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)

MQTT协议