MQTT入门
Posted xiyangyang8110
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT入门相关的知识,希望对你有一定的参考价值。
背景:大脑端突然报mqtt多次链接,不知道啥原因
报错信息:
E/MQTTManager: 连接断开:已断开连接 断开原因: java.net.SocketException: Connection reset
E/MQTTManager: 连接断开:已断开连接 断开原因: java.net.SocketException: Broken pipe
E/MQTTManager: 连接断开:已断开连接 断开原因: java.io.EOFException
解决:
增加重连保护机制,最多连7次
恶补下:
Message Queuing Telemetry(遥测:远程测量) Transport(传输协议),基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,为连接远程设备提供实时可靠的消息服务
MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:
(1)Connect。等待与服务器建立连接。
(2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
(3)Subscribe。等待完成订阅。
(4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。
(5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。
application中监听了所有activity的生命周期,在第一个activity启动时调用mqtt
registerActivityLifecycleCallbacks(activityLifecycleCallbacks);
int activityAount = 0;
ActivityLifecycleCallbacks activityLifecycleCallbacks = new ActivityLifecycleCallbacks() {
@Override
public void onActivityCreated(@NotNull Activity activity, Bundle savedInstanceState) {
activityList.add(activity);
ALog.d( "onActivityCreated",activity.getClass().getSimpleName(),activityList.size());
Log.e("wy", "onActivityCreated activity.getClass().getSimpleName: "+activity.getClass().getSimpleName()+" activityList.size: " +activityList.size());
/*if (activityList.size() == 0) {
ALog.d( "startService");
Intent intent = new Intent(instance, WebSocketService.class);
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
startForegroundService(intent);
} else {
ServiceUtils.startService(WebSocketService.class);
}
}*/
}
@Override
public void onActivityStarted(Activity activity) {
ALog.d( "onActivityStarted: "+activity.getClass().getSimpleName());
Log.e("wy", "onActivityStarted activity.getClass().getSimpleName: "+activity.getClass().getSimpleName()+" activityList.size: " +activityList.size());
if (activityAount == 0) {
//app回到前台
if (!debug) {
if (mqttManager == null) {
mqttManager = MQTTManager.getInstance(instance);
mqttManager.setMqConnectLisense(new MQTTManager.MqConnectLisense() {
@Override
public void onSuccess() {
Log.e("wy", "onSuccess: " );
mqttManager.publish("{\\"domain\\":\\"video_chat\\",\\"command\\":\\"start\\"}");
}
@Override
public void onFailure(Throwable t) {
ALog.i("连接失败",t);
activity.runOnUiThread(() -> {
Toast.makeText(activity, t.toString(), Toast.LENGTH_LONG).show();
});
}
});
} else {
Log.e("wy", "170onSuccess: " );
mqttManager.publish("{\\"domain\\":\\"video_chat\\",\\"command\\":\\"start\\"}");
}
}
ALog.d( "app启动或回到前台");
}
activityAount++;
}
}
在MQTTManager中
初始化
HOST = "tcp://" + getIp() + ":1883";
Log.d(TAG, "HOST = " + HOST);
String serverURI = HOST; //服务器地址(协议+地址+端口号)
mqttandroidClient = new MqttAndroidClient(mContext, serverURI, CLIENTID);
mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
mMqttConnectOptions = new MqttConnectOptions();
mMqttConnectOptions.setCleanSession(false); //设置是否清除缓存
mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒
mMqttConnectOptions.setUserName(USERNAME); //设置用户名
mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码
int connectNum=0;
//订阅主题的回调
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到消息: " + new String(message.getPayload()));
Log.e("wy", "收到消息: " + new String(message.getPayload()));
//收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等
//response("message arrived");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void connectionLost(Throwable arg0) {
Log.e(TAG, "连接断开:"+arg0.getMessage()+" 断开原因: "+arg0.getCause());
if(connectNum<7){
connectNum++;
doClientConnection();//连接断开,重连
}
}
};
以上是关于MQTT入门的主要内容,如果未能解决你的问题,请参考以下文章