EMQ ---websocket

Posted 清明-心若淡定

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EMQ ---websocket相关的知识,希望对你有一定的参考价值。

我们可以使用emq自带的Dashboard插件,进行websocket调试,打开谷歌浏览器输入网址,其中192.168.83.128是emq所在的IP地址:

http://192.168.83.128:18083/#/websocket

用户名:admin,密码:public

WebSocket URI:ws(s)://192.168.83.128:8083/mqtt

TCP URI:tcp://192.168.83.128:1883

一、测试实践

测试环境:使用MQTT 3.1.1版本协议,Wireshark 2.4.1版本,使用wireshark抓包分析(如果是虚拟机,要抓VMnet8虚拟网卡)。

clientid:861694030142473
username:libaineu2004
password:12345678

cleanSession:true

keepalive:60s

1、测试方案1,使用普通的tcp连接,TCP URI:tcp://192.168.83.128:1883,wireshark过滤条件tcp.port == 1883。
(1)、MQTT Connect Command
鼠标点击wireshark的Frame页面,右键菜单,复制,选中树的所有可见项目
MQ Telemetry Transport Protocol, Connect Command
    Header Flags: 0x10 (Connect Command)
    Msg Len: 51
    Protocol Name Length: 4
    Protocol Name: MQTT
    Version: 4
    Connect Flags: 0xc2
    Keep Alive: 60
    Client ID Length: 15
    Client ID: 861694030142473
    User Name Length: 12
    User Name: libaineu2004
    Password Length: 8
    Password: 12345678

右键菜单,复制,字节为HEX + ASCII转储
0000   10 33 00 04 4d 51 54 54 04 c2 00 3c 00 0f 38 36  .3..MQTT...<..86
0010   31 36 39 34 30 33 30 31 34 32 34 37 33 00 0c 6c  1694030142473..l
0020   69 62 61 69 6e 65 75 32 30 30 34 00 08 31 32 33  ibaineu2004..123
0030   34 35 36 37 38                                   45678

(2)、MQTT Connect Ack
MQ Telemetry Transport Protocol, Connect Ack
    Header Flags: 0x20 (Connect Ack)
    Msg Len: 2
    Acknowledge Flags: 0x00
    Return Code: Connection Accepted (0)

0000   20 02 00 00                                       ...


2、测试方案2,使用websocket连接,WebSocket URI:ws(s)://192.168.83.128:8083/mqtt,wireshark过滤条件tcp.port == 8083。我们可以观察到mqtt连接的过程,websocket协议,完整的数据被拆分了好几帧传输。每一帧报文,都有WebSocket和Data字段。我们仅仅需要关注Data字段即可。我们发现,把所有Data字段组合拼接起来,就是和测试方案1完全相同的数据。MQTT Connect Command和Ack都完全相同。说明了即使使用了Websocket方式来传输,仍然遵循mqtt协议。

从本质上来讲,Websocket也是基于 TCP 协议的,同时借用了HTTP的协议来完成一部分握手。 
主要解决 HTTP 协议中一个 request 对应一个 response 的尴尬。(http server 不能主动发送消息给 http client) 。通过 HTTP 完成 websocket 的握手过程,接着按照 websocket 协议进行通讯。 

websocket 也有他自己的数据帧格式: http://blog.csdn.net/u010487568/article/details/20569027

(1)MQTT Connect Command

0000   10                                               .
0000   33                                               3
0000   00 04                                            ..
0000   4d 51 54 54                                      MQTT
0000   04                                               .
0000   c2                                               .
0000   00 3c                                            .<
0000   00 0f                                            ..
0000   38 36 31 36 39 34 30 33 30 31 34 32 34 37 33     861694030142473
0000   00 0c                                            ..
0000   6c 69 62 61 69 6e 65 75 32 30 30 34              libaineu2004
0000   00 08                                            ..
0000   31 32 33 34 35 36 37 38                          12345678

(2)MQTT Connect Ack

Data (4 bytes)
    Data: 20020000
    [Length: 4]
0000   20 02 00 00                                       ...

 

二、以下是源码分析,先来看emq v1.1.3版本的源码:

1、-module(emqttd_app).

%%--------------------------------------------------------------------
%% Start Servers
%%--------------------------------------------------------------------

start_servers(Sup) ->
Servers = [
{"emqttd wsclient supervisor", {supervisor, emqttd_ws_client_sup}},
%% Start http listener
start_listener({http, ListenOn, Opts}) ->
mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []});
打开8083端口,开启http服务器。Websocket也是基于 TCP 协议的,同时借用了HTTP的协议来完成一部分握手。 

 

2、-module(emqttd_http).

%%--------------------------------------------------------------------
%% MQTT Over WebSocket
%%--------------------------------------------------------------------

handle_request(‘GET‘, "/mqtt", Req) ->
lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"),
Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
case {is_websocket(Upgrade), Proto} of
{true, "mqtt" ++ _Vsn} ->
emqttd_ws:handle_request(Req);
{false, _} ->
lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
Req:respond({400, [], <<"Bad Request">>});
{_, Proto} ->
lager:error("WebSocket with error Protocol: ~s", [Proto]),
Req:respond({400, [], <<"Bad WebSocket Protocol">>})
end;
websocket的入口函数。重点关注emqttd_ws:handle_request(Req);

 

3、-module(emqttd_ws).

ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid,
parser_fun = ParserFun}, ReplyChannel) ->
?WSLOG(debug, Peer, "RECV ~p", [Data]),
case catch ParserFun(iolist_to_binary(Data)) of
{more, NewParser} ->
State#wsocket_state{parser_fun = NewParser};
{ok, Packet, Rest} ->
gen_server:cast(ClientPid, {received, Packet}),
ws_loop(Rest, reset_parser(State), ReplyChannel);
{error, Error} ->
?WSLOG(error, Peer, "Frame error: ~p", [Error]),
exit({shutdown, Error});
{‘EXIT‘, Reason} ->
?WSLOG(error, Peer, "Frame error: ~p", [Reason]),
?WSLOG(error, Peer, "Error data: ~p", [Data]),
exit({shutdown, parser_error})
end.
 gen_server:cast(ClientPid, {received, Packet}), 接收来自客户端的消息。

 

4、-module(emqttd_ws_client).

handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
case emqttd_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
noreply(State#wsclient_state{proto_state = ProtoState1});
{error, Error} ->
?WSLOG(error, Peer, "Protocol error - ~p", [Error]),
shutdown(Error, State);
{error, Error, ProtoState1} ->
shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
{stop, Reason, ProtoState1} ->
stop(Reason, State#wsclient_state{proto_state = ProtoState1})
end;
emqttd_protocol:received(Packet, ProtoState)。接收处理消息。

 

5、-module(emqttd_protocol).

process(Packet = ?CONNECT_PACKET(Var), State0) ->

#mqtt_packet_connect{proto_ver = ProtoVer,
proto_name = ProtoName,
username = Username,
password = Password,
clean_sess = CleanSess,
keep_alive = KeepAlive,
client_id = ClientId} = Var,

State1 = State0#proto_state{proto_ver = ProtoVer,
proto_name = ProtoName,
username = Username,
client_id = ClientId,
clean_sess = CleanSess,
keepalive = KeepAlive,
will_msg = willmsg(Var),
connected_at = os:timestamp()},

trace(recv, Packet, State1),

{ReturnCode1, SessPresent, State3} =
case validate_connect(Var, State1) of
validate_connect(Var, State1),校验clientid,username和password的有效性































































































































以上是关于EMQ ---websocket的主要内容,如果未能解决你的问题,请参考以下文章

EMQ 的共享订阅

EMQ ---websocket

emq知识点

物联网架构成长之路-EMQ插件配置

EMQ学习---客户链接资源消耗

Centos 7 安装MQTT(EMQ)服务端