EMQ学习 ---集群

Posted saryli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EMQ学习 ---集群相关的知识,希望对你有一定的参考价值。

emqttd集群设置管理

一、先来看EMQ的文档定义:http://emqtt.com/docs/v1/cluster.html

emqttd集群设置管理

假设部署两台服务器s1.emqtt.io, s2.emqtt.io上部署集群:

节点名 主机名(FQDN) IP地址

[email protected][email protected]	s1.emqtt.io	192.168.0.10
[email protected][email protected]	s2.emqtt.io	192.168.0.20

Warning

节点名格式: [email protected], Host必须是IP地址或FQDN(主机名.域名)

[email protected]节点设置

emqttd/etc/vm.args:

节点加入集群

启动两台节点后,[email protected]上执行:

$ ./bin/emqttd_ctl cluster join [email protected]

Join the cluster successfully.
Cluster status: [{running_nodes,[‘[email protected]‘,‘[email protected]‘]}]

或,[email protected]上执行:

$ ./bin/emqttd_ctl cluster join [email protected]

Join the cluster successfully.
Cluster status: [{running_nodes,[‘[email protected]‘,‘[email protected]‘]}]

  

任意节点上查询集群状态:

$ ./bin/emqttd_ctl cluster status

Cluster status: [{running_nodes,[‘[email protected]‘,‘[email protected]‘]}]

 

节点退出集群

节点退出集群,两种方式:

  • leave: 本节点退出集群
  • remove: 从集群删除其他节点

[email protected]主动退出集群:

$ ./bin/emqttd_ctl cluster leave

[email protected]节点上,从集群删除[email protected]节点:

$ ./bin/emqttd_ctl cluster remove [email protected]

 

emqttd_ctl是怎么使用的?

-module(emqttd_cli).有定义要加载的命令

-export([status/1, broker/1, cluster/1, users/1, clients/1, sessions/1,
         routes/1, topics/1, subscriptions/1, plugins/1, bridges/1,
         listeners/1, vm/1, mnesia/1, trace/1]).
load() ->
    Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
    [emqttd_ctl:register_cmd(Cmd, {?MODULE, Cmd}, []) || Cmd <- Cmds].

 

加入集群后,子节点mnesia数据库怎么办?

mnesia数据库天然支持分布式集群。子节点加入之后就类似mysql数据库主从备份一样,主节点和子节点mnesia会保持同步。来看源码:

-module(emqttd_mnesia).

%% @doc Join the mnesia cluster
-spec(join_cluster(node()) -> ok).
join_cluster(Node) when Node =/= node() ->
    %% Stop mnesia and delete schema first
    ensure_ok(ensure_stopped()),
    ensure_ok(delete_schema()),
    %% Start mnesia and cluster to node
    ensure_ok(ensure_started()),
    ensure_ok(connect(Node)),
    ensure_ok(copy_schema(node())),
    %% Copy tables
    copy_tables(),
    ensure_ok(wait_for(tables)).

子节点加入之后,会先删除自己的mnesia数据库和各个表,然后copy一份主节点的库,再copy各个表数据。

%% @doc Cluster with node.
-spec(connect(node()) -> ok | {error, any()}).
connect(Node) ->
    case mnesia:change_config(extra_db_nodes, [Node]) of
        {ok, [Node]} -> ok;
        {ok, []}     -> {error, {failed_to_connect_node, Node}};
        Error        -> Error
    end.

%% @doc Copy schema.
copy_schema(Node) ->
    case mnesia:change_table_copy_type(schema, Node, disc_copies) of
        {atomic, ok} ->
            ok;
        {aborted, {already_exists, schema, Node, disc_copies}} ->
            ok;
        {aborted, Error} ->
            {error, Error}
    end.

%% @doc Copy mnesia tables.
copy_tables() ->
    emqttd_boot:apply_module_attributes(copy_mnesia).

 

函数copy_tables(),会检索和执行emq工程目录下所有erl模块里面的mnesia(copy)函数,模块要求含有"-copy_mnesia({mnesia, [copy]})."关键字

例如:

-module(emqttd_backend).

mnesia(copy) ->
    ok = emqttd_mnesia:copy_table(retained_message),
    ok = emqttd_mnesia:copy_table(backend_subscription).

-module(emqttd_router).

-copy_mnesia({mnesia, [copy]}).

mnesia(copy) ->
    ok = emqttd_mnesia:copy_table(route, ram_copies).

EMQ工程目录下,有关键字-boot_mnesia({mnesia, [boot]}).和-copy_mnesia({mnesia, [copy]}).的模块是:

-module(emqttd_backend).
-module(emqttd_pubsub).
-module(emqttd_router).
-module(emqttd_server).
-module(emqttd_sm).
-module(emqttd_trie).

其中,emqttd_backend模块新建的数据库retained_message和backend_subscription是disc_copies类型的,其他模块是ram_copies类型的。

emqttd的mnesia初始化

1、-module(emqttd_app).

start(_StartType, _StartArgs) ->
print_banner(),
emqttd_mnesia:start(),

  


2、-module(emqttd_mnesia)

start() ->
ensure_ok(ensure_data_dir()),
ensure_ok(init_schema()),
ok = mnesia:start(),
init_tables(),
wait_for(tables).

%% @doc Init mnesia schema or tables.
init_schema() ->
case mnesia:system_info(extra_db_nodes) of
[] -> mnesia:create_schema([node()]);
[_|_] -> ok
end.

%% @private
%% @doc Init mnesia tables.
init_tables() ->
case mnesia:system_info(extra_db_nodes) of
[] -> create_tables();
[_|_] -> copy_tables()
end.

 

3、数据库启动和拷贝的例子-module(emqttd_backend).

-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).

%% Mnesia callbacks
%%--------------------------------------------------------------------

mnesia(boot) ->
ok = emqttd_mnesia:create_table(retained_message, [
{type, ordered_set},
{disc_copies, [node()]},
{record_name, retained_message},
{attributes, record_info(fields, retained_message)},
{storage_properties, [{ets, [compressed]},
{dets, [{auto_save, 1000}]}]}]),
ok = emqttd_mnesia:create_table(backend_subscription, [
{type, bag},
{disc_copies, [node()]},
{record_name, mqtt_subscription},
{attributes, record_info(fields, mqtt_subscription)},
{storage_properties, [{ets, [compressed]},
{dets, [{auto_save, 5000}]}]}]);

mnesia(copy) ->
ok = emqttd_mnesia:copy_table(retained_message),
ok = emqttd_mnesia:copy_table(backend_subscription).
4、数据库启动和拷贝的例子-module(emqttd_router).

-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).

mnesia(boot) ->
ok = emqttd_mnesia:create_table(route, [
{type, bag},
{ram_copies, [node()]},
{record_name, mqtt_route},
{attributes, record_info(fields, mqtt_route)}]);

mnesia(copy) ->
ok = emqttd_mnesia:copy_table(route, ram_copies).

4、数据库启动和拷贝的例子-module(emqttd_router).

-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).

mnesia(boot) ->
    ok = emqttd_mnesia:create_table(route, [
                {type, bag},
                {ram_copies, [node()]},
                {record_name, mqtt_route},
                {attributes, record_info(fields, mqtt_route)}]);

mnesia(copy) ->
    ok = emqttd_mnesia:copy_table(route, ram_copies).

 

注意事项

1、如果EMQ所在服务器的IP地址是192.168.0.10

那么节点名称A:[email protected]和节点名称B:[email protected]是相同的意思,如果EMQ以节点A启动服务器,那么再以节点B启动是会失败的。

此时只能把A或B其中一个更名一下。即节点名格式: [email protected]里面的Name要加以区分。

2、集群的信息会记录在工程目录下,/rel/emqttd/data/mnesia/[email protected]/schema.DAT

即,当子节点A连接了主节点B,集群信息会分别记录在schema.DAT。如果子节点A没有主动断开集群,下次重启时,仍然会主动连接主节点B。

★有几个遗留问题待确认,不知道EMQ V2版本有无修正:

问题(1)如果子节点A没有主动断开集群,下次重启时,如果B不存在,那么A就会启动失败!好可怕!

问题(2)A连接上B之后。A目录下的文件/rel/emqttd/data/mnesia/[email protected]/retained_message.DCD和backend_subscription.DCD就自我删除了。以后也见不着了,彻底消失了,奇怪!请注意,这两个Mnesia数据库表类型是持久化,disc_copies。

★2018/05/17实测emq2.3.7,A主B从,结论如下:

(1)B join A之后,会主动删除B的Mnesia表,然后从A拷贝一份过来。B leave A之后,也会删除B的Mnesia表。

(2)B join A之后,A和B的Mnesia表会始终保持一致性。

添加或删除或更新A的表数据,B会同步。

添加或删除或更新B的表数据,A会同步。

(3)集群的信息会记录在工程目录下,/rel/emqttd/data/mnesia/[email protected]/schema.DAT。A或B进程退出后,再次启动时,仍然保持集群状态。

3、常用命令

./emqttd console
./emqttd start
./emqttd stop
./emqttd_ctl cluster join [email protected]
./emqttd_ctl cluster status

./emqttd_ctl cluster leave

werl -name [email protected] -setcookie emqsecretcookie
observer:start().
./_rel/emqttd/bin/emqttd console
./_rel/emqttd/bin/emqttd start
./_rel/emqttd/bin/emqttd_ctl status
./_rel/emqttd/bin/emqttd stop
./_rel/emqttd/bin/emqttd_ctl cluster join [email protected]
./_rel/emqttd/bin/emqttd_ctl cluster status
./_rel/emqttd/bin/emqttd_ctl cluster leave

 



 




























以上是关于EMQ学习 ---集群的主要内容,如果未能解决你的问题,请参考以下文章

物联网架构成长之路(33)-EMQ数据存储到influxDB

EMQ 学习---订阅$SYS主题,捕获客户端上下线消息

尝试使用开源 MQTT 代理 EMQ 为我的 IoT 项目设置我的 MQTT 云服务

当 EMQ X 遇见一维卷积神经网络:探索 AIoT 应用新可能

EMQ MQTT云服务器搭建 - 阿里云轻量应用服务器

EMQ ---客户端上线自动订阅主题