Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)

Posted Tobey_r1

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)相关的知识,希望对你有一定的参考价值。

android使用MQTT订阅及发布消息((一)初步了解Mqtt以及实现Android操作mqtt服务)

关于

  可能有很多小伙伴和我一样是初次知道mqtt,然后它是啥,用来干什么那就更不清楚了,前段时间公司要求调研这方面,所以今天这篇文章就来介绍mqtt是啥,以及Android可以用它来干啥。

MQTT介绍

MQTT协议实现方式

  实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
    MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
    当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

MQTT服务器

  MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

  • (1)接受来自客户的网络连接;
  • (2)接受客户发布的应用信息;
  • (3)处理来自客户端的订阅和退订请求;
  • (4)向订阅的客户转发应用程序消息。

MQTT协议中的订阅、主题、会话

一、订阅(Subscription)
  订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
二、会话(Session)
  每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
三、主题名(Topic Name)
  连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。
四、主题筛选器(Topic Filter)
  一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。
五、负载(Payload)
  消息订阅者所具体接收的内容。

MQTT协议中的方法

  MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

  • (1)Connect。等待与服务器建立连接。
  • (2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
  • (3)Subscribe。等待完成订阅。
  • (4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。
  • (5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。

MQTT服务质量 (QoS)

  qos为0"至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。
qos为1"至少一次",确保消息到达,但消息重复可能会发生。
qos为2"只有一次",确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。
可以在订阅/发布消息的时候设置服务质量。

MQTT服务端(Broker)

  网上有开源的服务端项目代码可以在电脑上进行搭建,也可以试用一些供应商的服务器。

MQTT客户端

经调研,Android开发mqtt客户端主流使用的是eclipse提供的paho.mqtt,项目引用:

implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

mqtt服务选取

  因为是调研,所以就不考虑自己去打一个mqtt的服务,网上搜能搜到很多搭建的mqtt,这里我是用了一个三方可以免费试用14天的服务方EMQ,使用前需要注册一个账号,如果你有github账号的话可以直接提供使用。
然后我们选中试用的服务(基础版和专业版都可以试用14天,这边建议试用专业版的),我因为之前已经试用过了专业版,所以现在只能试用基础版的了,专业版的好处就是提供的ip:

  然后我们等待它自动部署好项目即可:

  完成之后我们需要添加一个认证用户,这个认证用户用来连接时候判断身份用的:


  认证的用户及密码需要记一下。

Android连接mqtt

配置

  首先要在项目的build文件里添加如下引用:

implementation'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

其实这里有个问题,target为Android 12的(31),需要去做很多修改,包括去掉引用,当然这部分我准备放到第二篇里面来讲如何适配Android12版本手机实现mqtt使用。
  修改Androidmanifest.xml文件,添加权限和mqttservice的的注册:

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<service android:name="org.eclipse.paho.android.service.MqttService" />

修改mainActivity文件

private val TAG = "MqttClient"
private lateinit var mqttClient: MqttAndroidClient
override fun onCreate(savedInstanceState: Bundle?) 
   //....
   val serverURI = "tcp://pc1c6e1c.cn-shenzhen.emqx.cloud:11838"
   mqttClient = MqttAndroidClient(this, serverURI, "kotlin_mqtt_test1") //"kotlin_mqtt_test1"是作为连接客户端的名称来使用,所以要注意避免重复

fun connect() 
        mqttClient.setCallback(object : MqttCallback 
            override fun messageArrived(topic: String?, message: MqttMessage?) 
                Log.d(TAG, "Receive message: $message.toString() from topic: $topic")
            

            override fun connectionLost(cause: Throwable?) 
                Log.d(TAG, "Connection lost $cause.toString()")
            

            override fun deliveryComplete(token: IMqttDeliveryToken?) 

            
        )
        val options = MqttConnectOptions()
        options.apply 
                userName = "tobeyr1"
                this.password = "1234".toCharArray()
                connectionTimeout = 12
                this.keepAliveInterval = 0
                this.isAutomaticReconnect = false
                this.isCleanSession = true
            
        try 
            mqttClient.connect(options, null, object : IMqttActionListener 
                override fun onSuccess(asyncActionToken: IMqttToken?) 
                    Log.d(TAG, "Connection success")
                

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) 
                    Log.d(TAG, "Connection failure")
                
            )
         catch (e: MqttException) 
            e.printStackTrace()
        

    

  其中我们的serverurl可以在橄榄中看到:

  然后我们可以在调试台的监控里面看到已经连接到了mqtt服务:

订阅 topic

private fun subscribe(topic: String, qos: Int = 1) 
        try 
            mqttClient.subscribe(topic, qos, null, object : IMqttActionListener 
                override fun onSuccess(asyncActionToken: IMqttToken?) 
                    Log.d(TAG, "Subscribed to $topic")
                

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) 
                    Log.d(TAG, "Failed to subscribe $topic")
                
            )
         catch (e: MqttException) 
            e.printStackTrace()
        
    

  订阅成功之后也可以在控制台看到订阅的主题:

取消订阅 topic

private fun unsubscribe(topic: String) 
        try 
            mqttClient.unsubscribe(topic, null, object : IMqttActionListener 
                override fun onSuccess(asyncActionToken: IMqttToken?) 
                    Log.d(TAG, "Unsubscribed to $topic")
                

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) 
                    Log.d(TAG, "Failed to unsubscribe $topic")
                
            )
         catch (e: MqttException) 
            e.printStackTrace()
        
    

发布消息publish

private fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) 
        try 
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            message.qos = qos
            message.isRetained = retained
            mqttClient.publish(topic, message, null, object : IMqttActionListener 
                override fun onSuccess(asyncActionToken: IMqttToken?) 
                    Log.d(TAG, "$msg published to $topic")
                

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) 
                    Log.d(TAG, "Failed to publish $msg to $topic")
                
            )
         catch (e: MqttException) 
            e.printStackTrace()
        
    

断开 MQTT 连接

private fun disconnect() 
        try 
            mqttClient.disconnect(null, object : IMqttActionListener 
                override fun onSuccess(asyncActionToken: IMqttToken?) 
                    Log.d(TAG, "Disconnected")
                

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) 
                    Log.d(TAG, "Failed to disconnect")
                
            )
         catch (e: MqttException) 
            e.printStackTrace()
        
    

  好了,本篇简单介绍Android(11及以下版本)连接mqtt服务就到此结束了,下篇将会介绍兼容Android12版本调用mqtt服务。有问题欢迎批评指正,觉得不错的也请点个赞,多谢。

以上是关于Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)的主要内容,如果未能解决你的问题,请参考以下文章

了解 mqtt 订阅者 qos

通过 mosquitto 了解 MQTT协议

node-red实现MQTT通讯

EMQX v4.4.5 发布:新增排他订阅及 MQTT 5.0 发布属性支持

MQTT协议及推送服务

MQTT