MQTT研究之EMQ:EMQX使用中的一些问题记录

Posted shihuc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQTT研究之EMQ:EMQX使用中的一些问题记录相关的知识,希望对你有一定的参考价值。

我的测试环境:

Linux: CentOS7

EMQX:V3.2.3

 

题外话:

这里主要介绍Websocket的支持问题。

 

对ws的支持比较正常,但是对wss的支持,调了较长的时间,没有成功。

 

javascript支持mqtt的工具有paho.mqtt.js和mqtt.js. 我这里为了研究wss,所以,重点采用的是mqtt.js.基于这两种工具包,ws的通信,我这里都很顺利的调通了,通信没有问题。

 

看代码

因为paho.mqtt.js没有看到有支持SSL的地方(至少从库里面的配置参数上,没有看到。。。),基于mqtt.js的wss配置,是有参数可以支持的,所以重点研究的是mqtt.js的使用。下面是我的wss代码。

<html>
<head>
  <title>Ws mqtt.js</title>
  <script src="./js/mqtt-3.0.0.min.js"></script>
</head>
<body>
    <script> 
    var myca = 
"-----BEGIN CERTIFICATE-----"+
"MIIDhDCCAmygAwIBAgIGAW2+wl6XMA0GCSqGSIb3DQEBCwUAMFcxFDASBgNVBAMTC0lPVFBMQVRG"+
"T1JNMQswCQYDVQQLEwJJVDELMAkGA1UEChMCVEsxCzAJBgNVBAcTAldIMQswCQYDVQQIEwJIQjEL"+
"MAkGA1UEBhMCQ04wHhcNMTkxMDEyMDY1NzUyWhcNNDkxMDEyMDY1NzUyWjBXMRQwEgYDVQQDEwtJ"+
"T1RQTEFURk9STTELMAkGA1UECxMCSVQxCzAJBgNVBAoTAlRLMQswCQYDVQQHEwJXSDELMAkGA1UE"+
"CBMCSEIxCzAJBgNVBAYTAkNOMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoBcnvo/P"+
"XT9keD5NC6nviy12hFQ7W+YCTMZJiGgi2lGI9uytYivzom7+WshLMqrUwE3E2BCTiLmGGGHv+GTd"+
"kkGbpF0wsZPpefTBw0czQNOQgMIKOw1t11GijakHvuzJKgRofjyfew2YqF+cBxOM/f0xhExfjft8"+
"qLUUJ80w9VEJDeBhmE20esH9if0KYfjHOJiw5h1p4qG/W6xIZhSm22bgsBGd75zE7HmYWxO2bSwb"+
"nt7boqk9Vyyeg/6XCujHHGekPpUKRsDfAPNDtYaUgSjqir6t4qDRHh3A2gxUlyWjQfCHhJOKSIi7"+
"BAV5qn3aHvXht9liSNiQHMhgeuJTjwIDAQABo1YwVDAfBgNVHSMEGDAWgBQN7Ujpzizp8uezK+Pm"+
"37StF+YJ1zASBgNVHRMBAf8ECDAGAQH/AgEAMB0GA1UdDgQWBBQN7Ujpzizp8uezK+Pm37StF+YJ"+
"1zANBgkqhkiG9w0BAQsFAAOCAQEAYPBChFSrAjjssqb6uwuZtD0JWZ4MiOKJFTNrfChmzcCOotYR"+
"IOEFF1UHw+vWbe/iYAFQvhG83DmbOVAgC/uTIXVL5W+H4Wh7SfToqDNMYR8hmtMYFyjzF/0eXN3c"+
"IvmusfZdDJrpkEhQ7OxlN58UQZSrakPz/q8mXN2pe9bRUGh1sQ0hlwuLI+M4GX8SK68XRxEZfC3S"+
"qvl+YLL7jQRwUduVvjK33SepXjG3JrM6ds8Q6pvXCFDcLymo5XuV6GX7HGuFIcg1u4PUnuM7xgw7"+
"vzSwQOvGXdkDZrsdrZa2b9wVxgLFc6VY0neLwjjFBVSHIziQ+vaDrbzhc8h06U1h4Q=="+
"-----END CERTIFICATE-----";
    var mycert = 
"-----BEGIN CERTIFICATE-----"+
"MIIDfTCCAmWgAwIBAgIGAW2+25yZMA0GCSqGSIb3DQEBCwUAMFcxFDASBgNVBAMTC0lPVFBMQVRG"+
"T1JNMQswCQYDVQQLEwJJVDELMAkGA1UEChMCVEsxCzAJBgNVBAcTAldIMQswCQYDVQQIEwJIQjEL"+
"MAkGA1UEBhMCQ04wHhcNMTkxMDEyMDY1NzUyWhcNMjkxMDEyMDcyNTI2WjBZMRYwFAYDVQQDEw0x"+
"MC45NS4yMDAuMTU1MQswCQYDVQQLEwJJVDELMAkGA1UEChMCVEsxCzAJBgNVBAcTAldIMQswCQYD"+
"VQQIEwJIQjELMAkGA1UEBhMCQ04wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCNncv+"+
"iY61KPT9GiHCsAs03EkW2s6usfHq8fgnrfOr4uEATBYsh+UuVM2dCEb37A57PYwBLY3ZgqnjkQke"+
"WHwf/iyofpte1c3zF64Qi9qt4xuDnWNxAE1DmxCNcKGD1oNEW/5/6g5kVaApD2EjD9DPbL2+/nHT"+
"oEyVuqi04JYVLslJOaE0kthtSOakcdzGAOlEYTZbrzlBexSTQxDlRlFnO6IT+DaFpFr0W9ovBVj8"+
"pp0a7PzjMB6mzUlq+tdU1n46YiuqzPHZK7ZdLl1Nbm7Dpmz8k09uIvUtpWTqbowq/SB3YX+yRg2T"+
"VAVgQb3u9faGYMmhHLiF11jNpxhkjHTfAgMBAAGjTTBLMB8GA1UdIwQYMBaAFA3tSOnOLOny57Mr"+
"4+bftK0X5gnXMAkGA1UdEwQCMAAwHQYDVR0OBBYEFHVOM4gj+f2AlNUzHfh2sWOo7n/uMA0GCSqG"+
"SIb3DQEBCwUAA4IBAQCP0gWTUuQ27gvNiqicOr7vFpQ/MNKOK6stY856kIZr5DDtRs4X3SPv37wL"+
"6VHx9hxFRJ8oYL+7QBsuFo7UIxsu1N7/Saf5jEjOcmzcgj2EVmyWNR3TcgF+qTeRIGZXKnHd1ted"+
"5D9D8RcLbKHSwTX3/7u8QNie5D+4XE4oMpJywjxyKv9ypxzEXmLU8/qIznkE9HdJctMyFiltHPGR"+
"x+cjIsy3kyIxz7LfL2bhDogEXbfofbCm/tEJ1EBNf7+mhZQhNfrU8CUBOXiWg/2weX/sh++TF+Oe"+
"DDj4I+gB4BK5293Lh9F9A3DpirxEFksUcGxdmeDYybwNTYjnByL1pxFd"+
"-----END CERTIFICATE-----";
    var mykey = 
"-----BEGIN PRIVATE KEY-----"+
"MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCNncv+iY61KPT9GiHCsAs03EkW"+
"2s6usfHq8fgnrfOr4uEATBYsh+UuVM2dCEb37A57PYwBLY3ZgqnjkQkeWHwf/iyofpte1c3zF64Q"+
"i9qt4xuDnWNxAE1DmxCNcKGD1oNEW/5/6g5kVaApD2EjD9DPbL2+/nHToEyVuqi04JYVLslJOaE0"+
"kthtSOakcdzGAOlEYTZbrzlBexSTQxDlRlFnO6IT+DaFpFr0W9ovBVj8pp0a7PzjMB6mzUlq+tdU"+
"1n46YiuqzPHZK7ZdLl1Nbm7Dpmz8k09uIvUtpWTqbowq/SB3YX+yRg2TVAVgQb3u9faGYMmhHLiF"+
"11jNpxhkjHTfAgMBAAECggEAGNUmxEeSwqmf7BH9AYqlLGvEVUGhJHkHls8/WJXkOKvcEJVG/iAG"+
"TA2K0JJ2/1phX7L3Le2zBgUnjUQNeUjC20lSw2kQDZ0oNAxe9X/QpNVCKd4bZeFLqyMxz9uENv4S"+
"npYTFpMQWv+4yfpsah2H3BeRXkB37UgapdbJZo5LQwNieJlKbZVazOg3We7d90epxToV1zrEO21h"+
"4VfN/OzpsQ4VeMoFSEozFZHU4E7rXhSw7zZqd7vZwnhfUAvLvzAMrfuVOTknNkooty9CzU1f6UEk"+
"bjSyUfHndGMUl45Nws4mWoWgDbF3sjytcjBUKxpqbduy0StBGEz3ipj86k7FwQKBgQDAnNV7pzlK"+
"BvnZ0FhoJn/rXIx3stt0O2w8k0oSdkZ67cValSQHSlS0bRwofj59TK5Wj8/DKerqJKXQgWt9MCcy"+
"5k7541NG31B7EEOeLYZcsQA1kNYYvgkghrj30OC3+SwbiUEU2E+LDF5Mq2VIdUqAKamH6n9Xc9mF"+
"UALHh59FKQKBgQC8OKYEbhQicUppLtk2d/5tzV6zZySZq2bZz3Yzs9AyegBs8eAUF5dk86t1DRg3"+
"+nXvqvB1BSL0kW7OWuwGw2LYP3xbSM2vQ/Yp5RwRFtBXbWyd6Brw3AZf33L0H/6EmUUZDZTSI8wn"+
"YP4av2zYqY0LErX+giyzDqYPjr0CSaRixwKBgQCUdo6q0Bt9buEYvdnW9Y32CZEQFmHHNqJYEqOu"+
"lVIAINPU3U42/FlL9SoWIDDkfb2HZTQcV2wF7BePHqKOjRY5yoGnZUxkPW8YXFHyU17UhW4G28va"+
"qGf6lT1vbqY3yCzyUJpEifLN18u7xwS7lATHwtU0uVBMRTSt6B0sVOIz+QKBgHYBykyiu2rwvqd9"+
"oN/ekZ1EEmjGdRMHfJ94y/IUfab4CMZFS6ktNVUs1MW5ZQLLvB98e+/SPZ06hU9JgAupepJ8Ezqk"+
"RZjqMi8IHvWz6QG3cE78U8/JExIO5WKboJaABet0MWci3H+f9xDMDbE1dGGJ5KLh+KZ2u7SrWGNT"+
"yoXHAoGBAKGwZM4/ZUOVnl1nMDg+m96A9+VaGT/d8mxImTDdDmEuarjmGuiqqygWMMJirgQDovsD"+
"BTZP3/a5N3TmqVt4RP+5Sd5T2svlC536etHOIltuyXeJSsYSrk0ifvdG94ePhlGBrWLt8wxKEaG6"+
"BLCty7md7EUHxNS19Oa4BV7JVN9t"+
"-----END PRIVATE KEY-----";
    var options = {
          protocolId: MQTT,
          protocolVersion: 4,
          clean: true,
          reconnectPeriod: 1000,
          connectTimeout: 30 * 1000,
          clientId: emqx_mqtt_js001,
          username: shihuc,
          password: shihuc,
          keepalive: 50,          
          rejectUnauthorized: true, //若不是wss,这个参数改为false,然后下面的三个参数都不需要出现在options里面
          key: mykey,
          cert: mycert,        
          // The CA list will be used to determine if server is authorized
          ca: myca
    }
    const sendTopic = "kkk/send";
    const recvTopic = "kkk/recv";
    const client  = mqtt.connect(wss://10.95.200.13:8084/mqtt, options); //ws模式下,将protocol改为ws,端口改为8083

    // 连接建立后,可以在此处设置应用逻辑作    
    client.on(connect, function () {
        client.subscribe(recvTopic, {qos: 0});
        client.publish(sendTopic, "connection is successful!", {qos: 0, retain: false }) ;     
    }) 

    // 在这里可以添加业务逻辑,正常接收消息
    client.on("message", function (topic, payload) {
        console.log([topic, payload].join(": "));        
    })
    
    // 重连成功,在这里设置重连的业务逻辑
    client.on(reconnect, (error) => {        
        console.log(正在重连:, error);    
    })

    // 这里显示连接错误的信息,可以添加业务逻辑
    client.on(error, (error) => {
        console.log(连接失败:, error);
        client.end();
    })
        
    function send() {
        var s = document.getElementById("msg").value;
        if (s) {
            s = "{time:" + new Date().Format("yyyy-MM-dd hh:mm:ss") + ", content:" + (s) + ", from: web console based on mqtt.js}";                        
            client.publish(sendTopic, s);
            document.getElementById("msg").value = "";
        }
    }

    var count = 0;

    function start() {
        window.tester = window.setInterval(function () {
            if (client.connected) {
                var s = "{time:" + new Date().Format("yyyy-MM-dd hh:mm:ss") + ", content:" + (count++) + ", from: web console based on mqtt.js}";                
                client.publish(sendTopic, s);
            }
        }, 1000);
    }

    function stop() {
        window.clearInterval(window.tester);
    }

    Date.prototype.Format = function (fmt) { //author: meizz 
        var o = {
            "M+": this.getMonth() + 1, //月份 
            "d+": this.getDate(), //
            "h+": this.getHours(), //小时 
            "m+": this.getMinutes(), //
            "s+": this.getSeconds(), //
            "q+": Math.floor((this.getMonth() + 3) / 3), //季度 
            "S": this.getMilliseconds() //毫秒 
        };
        if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length));
        for (var k in o)
            if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length)));
        return fmt;
    }
    </script>
    <input type="text" id="msg" />
    <input type="button" value="Send" onclick="send()" />
    <input type="button" value="Start" onclick="start()" />
    <input type="button" value="Stop" onclick="stop()" />
</body>
</html>

 

一直不成功,双向验证的时候,总是报服务端证书不对。

技术图片

 

这个问题,让我想起来,之前研究EMQX的CoAP协议在DTLS安全通信模式下不工作的问题了。是EMQX目前不支持么?

 

有知道的看官,可以留言支招,若你有任何想法。。。

 

以上是关于MQTT研究之EMQ:EMQX使用中的一些问题记录的主要内容,如果未能解决你的问题,请参考以下文章

mqtt的Emq服务器的搭建

有关 EMQ X 水平可扩展性的挑战与对策 - MQTT Broker 集群详解

EMQ X 规则引擎系列 (八)桥接消息到 MQTT Broker

阿里云下Ubuntu18.04安装部署EMQ X 消息服务器

阿里云下Ubuntu18.04安装部署EMQ X 消息服务器

Windows安装EMQ服务器(mqtt)