Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)
Posted Tobey_r1
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)相关的知识,希望对你有一定的参考价值。
android使用MQTT订阅及发布消息((一)初步了解Mqtt以及实现Android操作mqtt服务)
关于
可能有很多小伙伴和我一样是初次知道mqtt,然后它是啥,用来干什么那就更不清楚了,前段时间公司要求调研这方面,所以今天这篇文章就来介绍mqtt是啥,以及Android可以用它来干啥。
MQTT介绍
MQTT协议实现方式
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
- Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
- payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。
MQTT服务器
MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:
- (1)接受来自客户的网络连接;
- (2)接受客户发布的应用信息;
- (3)处理来自客户端的订阅和退订请求;
- (4)向订阅的客户转发应用程序消息。
MQTT协议中的订阅、主题、会话
一、订阅(Subscription)
订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
二、会话(Session)
每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
三、主题名(Topic Name)
连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。
四、主题筛选器(Topic Filter)
一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。
五、负载(Payload)
消息订阅者所具体接收的内容。
MQTT协议中的方法
MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:
- (1)Connect。等待与服务器建立连接。
- (2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
- (3)Subscribe。等待完成订阅。
- (4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。
- (5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。
MQTT服务质量 (QoS)
qos为0"至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。
qos为1"至少一次",确保消息到达,但消息重复可能会发生。
qos为2"只有一次",确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。
可以在订阅/发布消息的时候设置服务质量。
MQTT服务端(Broker)
网上有开源的服务端项目代码可以在电脑上进行搭建,也可以试用一些供应商的服务器。
MQTT客户端
经调研,Android开发mqtt客户端主流使用的是eclipse提供的paho.mqtt,项目引用:
implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
mqtt服务选取
因为是调研,所以就不考虑自己去打一个mqtt的服务,网上搜能搜到很多搭建的mqtt,这里我是用了一个三方可以免费试用14天的服务方EMQ,使用前需要注册一个账号,如果你有github账号的话可以直接提供使用。
然后我们选中试用的服务(基础版和专业版都可以试用14天,这边建议试用专业版的),我因为之前已经试用过了专业版,所以现在只能试用基础版的了,专业版的好处就是提供的ip:
然后我们等待它自动部署好项目即可:
完成之后我们需要添加一个认证用户,这个认证用户用来连接时候判断身份用的:
认证的用户及密码需要记一下。
Android连接mqtt
配置
首先要在项目的build文件里添加如下引用:
implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
其实这里有个问题,target为Android 12的(31),需要去做很多修改,包括去掉引用,当然这部分我准备放到第二篇里面来讲如何适配Android12版本手机实现mqtt使用。
修改Androidmanifest.xml文件,添加权限和mqttservice的的注册:
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<service android:name="org.eclipse.paho.android.service.MqttService" />
修改mainActivity文件
private val TAG = "MqttClient"
private lateinit var mqttClient: MqttAndroidClient
override fun onCreate(savedInstanceState: Bundle?)
//....
val serverURI = "tcp://pc1c6e1c.cn-shenzhen.emqx.cloud:11838"
mqttClient = MqttAndroidClient(this, serverURI, "kotlin_mqtt_test1") //"kotlin_mqtt_test1"是作为连接客户端的名称来使用,所以要注意避免重复
fun connect()
mqttClient.setCallback(object : MqttCallback
override fun messageArrived(topic: String?, message: MqttMessage?)
Log.d(TAG, "Receive message: $message.toString() from topic: $topic")
override fun connectionLost(cause: Throwable?)
Log.d(TAG, "Connection lost $cause.toString()")
override fun deliveryComplete(token: IMqttDeliveryToken?)
)
val options = MqttConnectOptions()
options.apply
userName = "tobeyr1"
this.password = "1234".toCharArray()
connectionTimeout = 12
this.keepAliveInterval = 0
this.isAutomaticReconnect = false
this.isCleanSession = true
try
mqttClient.connect(options, null, object : IMqttActionListener
override fun onSuccess(asyncActionToken: IMqttToken?)
Log.d(TAG, "Connection success")
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?)
Log.d(TAG, "Connection failure")
)
catch (e: MqttException)
e.printStackTrace()
其中我们的serverurl可以在橄榄中看到:
然后我们可以在调试台的监控里面看到已经连接到了mqtt服务:
订阅 topic
private fun subscribe(topic: String, qos: Int = 1)
try
mqttClient.subscribe(topic, qos, null, object : IMqttActionListener
override fun onSuccess(asyncActionToken: IMqttToken?)
Log.d(TAG, "Subscribed to $topic")
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?)
Log.d(TAG, "Failed to subscribe $topic")
)
catch (e: MqttException)
e.printStackTrace()
订阅成功之后也可以在控制台看到订阅的主题:
取消订阅 topic
private fun unsubscribe(topic: String)
try
mqttClient.unsubscribe(topic, null, object : IMqttActionListener
override fun onSuccess(asyncActionToken: IMqttToken?)
Log.d(TAG, "Unsubscribed to $topic")
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?)
Log.d(TAG, "Failed to unsubscribe $topic")
)
catch (e: MqttException)
e.printStackTrace()
发布消息publish
private fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false)
try
val message = MqttMessage()
message.payload = msg.toByteArray()
message.qos = qos
message.isRetained = retained
mqttClient.publish(topic, message, null, object : IMqttActionListener
override fun onSuccess(asyncActionToken: IMqttToken?)
Log.d(TAG, "$msg published to $topic")
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?)
Log.d(TAG, "Failed to publish $msg to $topic")
)
catch (e: MqttException)
e.printStackTrace()
断开 MQTT 连接
private fun disconnect()
try
mqttClient.disconnect(null, object : IMqttActionListener
override fun onSuccess(asyncActionToken: IMqttToken?)
Log.d(TAG, "Disconnected")
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?)
Log.d(TAG, "Failed to disconnect")
)
catch (e: MqttException)
e.printStackTrace()
好了,本篇简单介绍Android(11及以下版本)连接mqtt服务就到此结束了,下篇将会介绍兼容Android12版本调用mqtt服务。有问题欢迎批评指正,觉得不错的也请点个赞,多谢。
以上是关于Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)的主要内容,如果未能解决你的问题,请参考以下文章