MQTT入门

Posted xiyangyang8110

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT入门相关的知识,希望对你有一定的参考价值。

背景:大脑端突然报mqtt多次链接,不知道啥原因
报错信息:
E/MQTTManager: 连接断开:已断开连接 断开原因: java.net.SocketException: Connection reset
E/MQTTManager: 连接断开:已断开连接 断开原因: java.net.SocketException: Broken pipe
E/MQTTManager: 连接断开:已断开连接 断开原因: java.io.EOFException
解决:
增加重连保护机制,最多连7次
恶补下:
Message Queuing Telemetry(遥测:远程测量) Transport(传输协议),基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,为连接远程设备提供实时可靠的消息服务

MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

(1)Connect。等待与服务器建立连接。
(2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
(3)Subscribe。等待完成订阅。
(4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。
(5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。

application中监听了所有activity的生命周期,在第一个activity启动时调用mqtt

 registerActivityLifecycleCallbacks(activityLifecycleCallbacks);

int activityAount = 0;

    ActivityLifecycleCallbacks activityLifecycleCallbacks = new ActivityLifecycleCallbacks() {
        @Override
        public void onActivityCreated(@NotNull Activity activity, Bundle savedInstanceState) {
            activityList.add(activity);
            ALog.d( "onActivityCreated",activity.getClass().getSimpleName(),activityList.size());
            Log.e("wy", "onActivityCreated   activity.getClass().getSimpleName: "+activity.getClass().getSimpleName()+" activityList.size: " +activityList.size());
            /*if (activityList.size() == 0) {
                ALog.d( "startService");
                Intent intent = new Intent(instance, WebSocketService.class);
                if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
                    startForegroundService(intent);
                } else {
                    ServiceUtils.startService(WebSocketService.class);
                }
            }*/
        }

        @Override
        public void onActivityStarted(Activity activity) {
            ALog.d( "onActivityStarted: "+activity.getClass().getSimpleName());
            Log.e("wy", "onActivityStarted   activity.getClass().getSimpleName: "+activity.getClass().getSimpleName()+" activityList.size: " +activityList.size());

            if (activityAount == 0) {
                //app回到前台
                if (!debug) {
                    if (mqttManager == null) {
                        mqttManager = MQTTManager.getInstance(instance);
                        mqttManager.setMqConnectLisense(new MQTTManager.MqConnectLisense() {
                            @Override
                            public void onSuccess() {
                                Log.e("wy", "onSuccess: " );
                                mqttManager.publish("{\\"domain\\":\\"video_chat\\",\\"command\\":\\"start\\"}");
                            }

                            @Override
                            public void onFailure(Throwable t) {
                                ALog.i("连接失败",t);
                                activity.runOnUiThread(() -> {
                                    Toast.makeText(activity, t.toString(), Toast.LENGTH_LONG).show();
                                });
                            }
                        });
                    } else {
                        Log.e("wy", "170onSuccess: " );
                        mqttManager.publish("{\\"domain\\":\\"video_chat\\",\\"command\\":\\"start\\"}");
                    }
                }
                ALog.d( "app启动或回到前台");
            }
            activityAount++;
        }
        }

在MQTTManager中
初始化

HOST = "tcp://" + getIp() + ":1883";
        Log.d(TAG, "HOST = " + HOST);
        String serverURI = HOST; //服务器地址(协议+地址+端口号)
        mqttandroidClient = new MqttAndroidClient(mContext, serverURI, CLIENTID);
        mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
        mMqttConnectOptions = new MqttConnectOptions();
        mMqttConnectOptions.setCleanSession(false); //设置是否清除缓存
        mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
        mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒
        mMqttConnectOptions.setUserName(USERNAME); //设置用户名
        mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码
 int connectNum=0;

    //订阅主题的回调
    private MqttCallback mqttCallback = new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Log.i(TAG, "收到消息: " + new String(message.getPayload()));
            Log.e("wy", "收到消息: " + new String(message.getPayload()));
            //收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等
            //response("message arrived");
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {

        }

        @Override
        public void connectionLost(Throwable arg0) {
            Log.e(TAG, "连接断开:"+arg0.getMessage()+" 断开原因: "+arg0.getCause());

            if(connectNum<7){
                connectNum++;
                doClientConnection();//连接断开,重连

            }

        }
    };

以上是关于MQTT入门的主要内容,如果未能解决你的问题,请参考以下文章

MQTT 协议学习:001-有关概念入门

MQTT 入门介绍

ESP32-C3入门教程 网络篇⑦——MQTT 应用示例

MQTT入门

MQTT协议入门

MQTT协议入门