paho.MQTT
Posted Link2Points
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了paho.MQTT相关的知识,希望对你有一定的参考价值。
文章目录
EMQX
安装Broker
docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:4.3.8
基于原项目修改
https://github.com/eclipse/paho.mqtt.python
https://github.com/eclipse/paho.mqtt.android
Python编写
下载
pip install paho-mqtt
代码
#!/usr/bin/python
# coding:utf-8
import paho.mqtt.client as mqtt
import json
HOST = ""
PORT = 1883
user = "test"
pw = "test"
client_id = "test"
class Project_Mqtt:
# 建立连接
def do_connect(self, HOST, user, pw, client_id):
self.client = mqtt.Client(client_id)
self.client.username_pw_set(user, pw)
self.client.connect(HOST, PORT, 60)
# 处理订阅消息
def deal_on_message(self, client, userdata, msg):
print(msg.topic + " " + msg.payload.decode("utf-8"))
messages = json.loads(msg.payload)
# 订阅
def do_subscribe(self, topic, qos=0):
self.client.subscribe(topic, qos=qos)
self.client.on_message = self.deal_on_message
self.client.loop_forever()
# 发布
def do_publish(self, topic, message, qos=0):
self.client.publish(topic, message, qos=qos, retain=False)
# 停止
def do_stop(self):
self.client.loop_stop()
android配置编写及问题解决
对高版本的SDK的一种暂时的可适用方法。
- 解决问题:localbroadcastmanager未定义(高版本API已不支持),参考Github issue,引用第三方包但无效。故须在根目录下配置gradle.properties
1. 根目录下
- gradle.properties添加
android.enableJetifier=true
2. app目录下
- build.gradle添加
implementation 'com.android.support:support-v4:30+' implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' implementation 'androidx.localbroadcastmanager:localbroadcastmanager:1.0.0'
- build.gradle修改SDK31 为 SDK30
- build.gradle 同步
3. AndroidManifest.xml 添加
<!-- Permissions the Application Requires -->
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<!--<uses-permission android:name="android.permission.READ_PHONE_STATE" />-->
<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.INTERNET" /
<!-- MQTT服务 -->
<service android:name="org.eclipse.paho.android.service.MqttService"/>
4. MainActivity.java
serverUri 须为【主机的ip】
package com.example.mqtta;
import android.widget.Toast;
import androidx.appcompat.app.AppCompatActivity;
import android.os.Bundle;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MainActivity extends AppCompatActivity {
MqttAndroidClient mqttAndroidClient;
// final String serverUri = "tcp://10.96.131.119:1883";
String clientId = "test";
final String subscriptionTopic = "test";
final String publishTopic = "test";
final String publishMessage = "Hello World!";
final String name = "test";
final String password = "test";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
clientId = clientId + System.currentTimeMillis();
mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId);
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
publishMessage();
// if (reconnect) {
// addToHistory("Reconnected to : " + serverURI);
// // Because Clean Session is true, we need to re-subscribe
// subscribeToTopic();
// } else {
// addToHistory("Connected to: " + serverURI);
// }
}
@Override
public void connectionLost(Throwable cause) {
}
// 处理订阅消息
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Toast.makeText(MainActivity.this, new String(message.getPayload()), Toast.LENGTH_LONG).show();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
// 连接设置
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setUserName(name);
mqttConnectOptions.setPassword(password.toCharArray());
try {
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
// disconnectedBufferOptions.setBufferEnabled(true);
// disconnectedBufferOptions.setBufferSize(100);
// disconnectedBufferOptions.setPersistBuffer(false);
// disconnectedBufferOptions.setDeleteOldestMessages(false);
// mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_LONG).show();
subscribeToTopic();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Toast.makeText(MainActivity.this, "连接失败", Toast.LENGTH_LONG).show();
// addToHistory("Failed to connect to: " + serverUri);
}
});
} catch (MqttException ex){
ex.printStackTrace();
}
}
// 订阅
public void subscribeToTopic(){
try {
mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Toast.makeText(MainActivity.this, "订阅成功", Toast.LENGTH_SHORT).show();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
//
// // THIS DOES NOT WORK!
// mqttAndroidClient.subscribe(subscriptionTopic, 0, new IMqttMessageListener() {
// @Override
// public void messageArrived(String topic, MqttMessage message) throws Exception {
// // message Arrived!
// System.out.println("Message: " + topic + " : " + new String(message.getPayload()));
// }
// });
} catch (MqttException ex){
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
// 发布
public void publishMessage(){
try {
MqttMessage message = new MqttMessage();
message.setPayload(publishMessage.getBytes());
mqttAndroidClient.publish(publishTopic, message);
Toast.makeText(MainActivity.this, "Message Published", Toast.LENGTH_SHORT).show();
if(!mqttAndroidClient.isConnected()){
Toast.makeText(MainActivity.this, "Lost of Connected", Toast.LENGTH_SHORT).show();
}
} catch (MqttException e) {
System.err.println("Error Publishing: " + e.getMessage());
e.printStackTrace();
}
}
}
以上是关于paho.MQTT的主要内容,如果未能解决你的问题,请参考以下文章
Android WebView 和 WebSockets / Paho.MQTT