多个进程如何在 Erlang 中同时使用一个公共列表?

Posted

技术标签:

【中文标题】多个进程如何在 Erlang 中同时使用一个公共列表?【英文标题】:how can multiple processes use one common list concurrently in Erlang? 【发布时间】:2016-11-28 00:43:36 【问题描述】:

我知道Erlang 是关于并发的,我们使用spawn/spawn_link 创建一个进程我不明白的是所有进程如何同时使用一个通用的用户列表?说一个 ordict/dict 存储。

我想做的是; 1. 生成的用户进程订阅/监听注册进程A 2.注册过程A店铺Pid, Userid所有在线用户 3.当一些用户发送消息时,用户进程询问进程A收件人是否在线。

erlang 中发送消息是异步的,但是当一个用户被多个用户发送消息时它也是异步的吗?

【问题讨论】:

除了史蒂夫回答之外,我还提供了一个“玩具”模块来说明它是如何工作的。 【参考方案1】:

您可以将进程A 设为gen_server process,并将存储在线用户的任何数据结构保留为进程状态。存储新用户或删除用户可以使用gen_server:cast/2 完成,检查用户是否在线可以使用gen_server:call/2 完成。或者,您可以让gen_server 创建一个公开可读的ets table 以允许任何进程读取它以检查在线用户,但存储和删除仍需要转换为gen_server。您甚至可以使该表公开可读和可写,以便任何进程都可以存储、删除或检查用户。但请记住,默认情况下,ets 表在创建它的进程死亡时被销毁,因此如果您需要它保留,即使创建它的gen_server 死亡,您必须安排它被继承其他一些进程,或give it to a supervisor。

【讨论】:

【参考方案2】:

一个严肃的解决方案应该使用 Steve 建议的 OTP 行为(gen_server、supervisor...)。 无论如何,我编写了一个小示例模块,它实现了服务器和客户端,并且可以使用命令 erl -sname test 在一个节点上启动(或使用 erl -sname node1erl -sname node2... 的多个节点)。

它还包括一个shell会话的例子,说明了大多数情况,我希望它可以帮助你了解进程之间的交流,同步或异步。

注意: 对用户列表的访问不是并发的,如果该列表属于本例中的服务器进程,则不可能。这就是为什么 Steve 建议使用 ETS 来存储信息并进行真正的并发访问。我试图用接口编写示例,这些接口应该允许使用 ETS 而不是元组列表进行快速重构。

-module(example).

-export([server/0,server_stop/1,server_register_name/2,server_get_address/2, server_quit/2, % server process and its interfaces
         client/1,quit/1,register_name/2,get_address/2,send_message/3,print_messages/1, % client process and its interfaces
         trace/0]). % to call the tracer for a nice message view

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Client interface
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

client(Node) ->
    % connect the current node to the servernode given in parameter
    % it will fail if the connection cannot be established
    true = net_kernel:connect_node(Node),
    % spawn a client process
    spawn(fun () -> client([],unregistered,server,Node) end).

register_name(ClientPid,Name) ->
    % use a helper to facilitate the trace of everything
    send_trace(ClientPid,register_name,self(),Name),
    % wait for an answer, it is then a synchronous call
    receive
        % no work needed, simply return any value
        M -> M 
    after 1000 ->
        % this introduce a timeout, if no answer is received after 1 second, consider it has failed
        no_answer_from_client
    end.

get_address(ClientPid,UserName) ->
    send_trace(ClientPid,get_address,self(),UserName),
    % wait for an answer, it is then a synchronous call
    receive
        % in this case, if the answer is tagged with ok, extract the Value (will be a Pid)
        ok,Value -> Value;
        M -> M 
    after 1000 ->
        no_answer_from_client
    end.

send_message(ClientPid,To,Message) ->
    % simply send the message, it is asynchronous
    send_trace(ClientPid,send_message,To,Message).

print_messages(ClientPid) ->
    send_trace(ClientPid,print_messages).

quit(ClientPid) ->
    send_trace(ClientPid,quit).



%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% client local functions
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

client(Messages,Name,Server) ->
    receive
        register_name,From,UserName when Name == unregistered ->
            % if not yet registered send the request to the server and
            % backward the answer to the requester
            Answer = server_register_name(Server,UserName),
            send_trace(From,Answer),
            NName = case Answer of
                registered -> UserName;
                _ -> Name
            end,
            client(Messages,NName,Server);
        register_name,From,_ ->
            % if already registered reject the request
            send_trace(From,already_registered_as,Name),
            client(Messages,Name,Server);
        get_address,From,UserName when Name =/= unregistered ->
            Answer = server_get_address(Server,UserName),
            send_trace(From,Answer),
            client(Messages,Name,Server);           
        send_message,To,Message ->
            % directly send the message to the user, the server is not concerned
            send_trace(To,new_message,erlang:date(),erlang:time(),Name,Message),
            client(Messages,Name,Server);
        print_messages ->
            % print all mesages and empty the queue
            do_print_messages(Messages),
            client([],Name,Server);
        quit ->
            server_quit(Server,Name);
        new_message,M ->
            % append the new message
            client([M|Messages],Name,Server);
        _ ->
            client(Messages,Name,Server)
        end.

do_print_messages(Messages) ->
    lists:foreach(fun(D,T,W,M) -> io:format("from ~p, at ~p on ~p, received ~p~n",[W,T,D,M]) end,Messages).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Server interface
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

server() ->
    true = register(server,spawn(fun () -> server([]) end)),
    node().

server_stop(Server) ->
    send_trace(Server,stop).

server_register_name(Server,User) ->
    send_trace(Server,register_name,self(),User),
    receive
        M -> M
    after 900 ->
        no_answer_from_server
    end.

server_get_address(Server,User) ->
    send_trace(Server,get_address,self(),User),
    receive
        M -> M
    after 900 ->
        no_answer_from_server
    end.

server_quit(Server,Name) ->
    send_trace(Server,quit,Name).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% server local functions
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

server(Users) ->
    receive
        stop ->
            ok;
        register_name,From,User ->
            case lists:keyfind(User,1,Users) of
                false ->
                    send_trace(From,registered),
                    server([User,From|Users]);
                _ -> 
                    send_trace(From,already_exist,User),
                    server(Users)
                end;
        get_address,From,User ->
            case lists:keyfind(User,1,Users) of
                false ->
                    send_trace(From,does_not_exist,User),
                    server(Users);
                User,Pid ->
                    send_trace(From,ok,Pid),
                    server(Users)
                end;
        quit,Name ->
            server(lists:keydelete(Name,1,Users))
    end.



%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% global
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

trace() -> 
% start a collector, a viewer and trace the "trace_me" ...
    et_viewer:start([trace_global, true, trace_pattern, et,max,max_actors,20]).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% helpers
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

send_trace(To,Message) ->
    % all messages will be traced by "et"
    et:trace_me(50,self(),To,Message,[]),
    To ! Message.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% shell commands
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

% c(example).
% example:trace().
% N = node().
% C1 = example:client(N).
% example:register_name(pid(0,5555,0),"fails").
% example:register_name(C1,"fails_again").
% example:server().
% example:register_name(C1,"Joe").
% C2 = example:client(N).
% example:register_name(C2,"Bob").
% example:print_messages(C1).
% C2 = example:get_address(C1,"Bob").
% example:send_message(C1,C2,"Hi Bob!").
% example:send_message(C1,C2,"Hi Bob! are you there?").
% example:print_messages(C2).
% example:send_message(C2,C1,"Hi Joe! Got your message.").
% example:print_messages(C2).
% example:print_messages(C1).
% example:quit(C1).
% example:get_address(C2,"Joe").
% example:server_stop(server,N).
% example:get_address(C2,"Joe").
% example:get_address(C1,"Bob").

这里是事件查看器的摘录:

【讨论】:

如何获取“事件查看器”图片? 在 send_trace 函数中,我添加了对et:trace_me/5 的调用,它什么也不做,只是“预配置”了使用我在 trace/0 函数中调用的et_viewer:start([trace_global, true, trace_pattern, et,max,max_actors,20]) 进行跟踪。请注意,您需要 wx_widget 在您的平台上工作。顺便说一句,如果有人知道如何强制进程 和服务器在同一时间线上,我将不胜感激。

以上是关于多个进程如何在 Erlang 中同时使用一个公共列表?的主要内容,如果未能解决你的问题,请参考以下文章

Erlang 监控多个进程

Erlang消息发送和接收与多个进程

在 Erlang 中作为进程生成的函数内部定义的函数

Erlang 语言中的进程与并发

如何使用公共列从多个表中检索数据

Erlang 中的接受器池和负载平衡?