Erlang,尝试制作 gen_server: 调用有很多响应

Posted

技术标签:

【中文标题】Erlang,尝试制作 gen_server: 调用有很多响应【英文标题】:Erlang, try to make gen_server: call with many responses 【发布时间】:2016-01-13 22:04:26 【问题描述】:

尝试在项目中使用 OTP-style 并得到一个 OTP-interface 问题。哪种方案更受欢迎/更美观?

我有什么:

    带有mochiweb 的网络服务器 一个进程,产生许多 (1000-2000) 个子进程。 儿童包含状态(netflow-speed)。如果需要,处理给孩子的代理消息并创建新的孩子。

在 mochiweb 中,我有一页显示所有演员的速度,乳清是如何制作的:

    nf_collector ! get_abonents_speed, self(),
    receive
        abonents_speed_count, AbonentsCount ->
            ok
    end,
%% write http header, chunked
%% and while AbonentsCount != 0,  receive speed and write http

这不是选择风格,我怎么能理解。解决方案:

    在 API 同步函数中获取所有速度的请求并返回所有速度的列表。但我想马上写给客户。

    API-function 的一个参数是回调:

    nf_collector:get_all_speeds(fun (Speed) -> Resp:write_chunk(templater(Speed)) end)
    
    返回迭代器: get_all_speeds 的结果之一将与接收块一起使用。每次调用都会返回ok, Speed,最后返回end

get_all_speeds() ->
    nf_collector ! get_abonents_speed, self(),
    receive
        abonents_speed_count, AbonentsCount ->
            ok
    end,
    ok, fun() -> 
        create_receive_fun(AbonentsCount)
    end.

create_receive_fun(0)->
    end;

create_receive_fun(Count)->
        receive
            abonent_speed, Speed ->
                Speed
        end,
        ok, Speed, create_receive_fun(Count-1).

【问题讨论】:

真正的问题是什么?您是否在实施任何选项时遇到困难,如果是,您应该询问与此相关的问题。否则,这将主要是完全取决于您的用例的意见。 我同意 Adam 的观点,并且认为这更像是一个设计问题,但没有足够的信息提供任何建议。为什么有 1000-2000 个包含状态的孩子。为什么有进程返回计数并发出调用而不是返回那些孩子并让调用者决定做什么?读/写比率是多少?什么是非功能性需求,比如低延迟或吞吐量更重要?它是主要功能还是占系统其余部分的比例?等等。如果没有其他信息,这对我来说没有多大意义。 写这个问题的原因很简单:erlang 提供了编写基于actor的程序的简单方法,OTP 提供了标准化。首先,我编写了没有 OTP 的程序,理解程序逻辑很复杂。添加 OTP 后,它变得平坦而简单。在这里,我遇到了一种更复杂的行为,即同步/异步调用。我问路,如果遇到类似的问题,其他 erlang 开发人员会选择什么。或者这个问题是由糟糕的设计引起的,答案是 - 做简单,没有 1000 个(或更多)演员 尝试改写:如果我的电话与许多演员联系在一起,那么我必须处理许多情况:1. 当我的电话接收消息时,一个或多个演员死亡 2. 呼叫发起者死亡 3. 造成 2一次 -3 次以上的呼叫 4. 其他... 哪里有任何 OTP 行为、良好的编码标准或关于我如何做到这一点的建议?其他查看我的代码的 erlang 编码人员的回答并没有开始瞎忙 :) 如果不是,真的,我可以通过多种方式做到这一点,所以......我可以关闭这个问题 【参考方案1】:

从主管那里生出你的“孩子”:

-module(ch_sup).
-behaviour(supervisor).
-export([start_link/0, init/1, start_child/1]).
start_link() -> supervisor:start_link(local, ?MODULE, ?MODULE, []).
init([]) -> ok, simple_one_for_one, [ch, ch, start_link, [], transient, 1000, worker, [ch]].
start_child(Data) -> supervisor:start_child(?MODULE, [Data]).

用 ch_sup:start_child/1 启动它们(数据是什么)。

将您的孩子实现为 gen_server:

-module(ch).
-behaviour(gen_server).
-record(?MODULE, speed).

...

get_speed(Pid, Timeout) ->
    try
        gen_server:call(Pid, get, Timeout)
    catch
        exit:timeout, _ -> timeout;
        exit:noproc, _ -> died
    end
.

...

handle_call(get, _From, St) -> reply, ok, St#?MODULE.speed, St end.

您现在可以使用主管获取正在运行的孩子的列表并查询他们,但您必须接受在获取孩子列表和调用他们之间有孩子死亡的可能性,显然孩子可能出于某种原因活着但没有响应,或响应错误等。

上面的 get_speed/2 函数返回 ok, Speed 或 dead 或 timeout。您可以根据自己的应用需求进行适当的过滤;通过列表理解很容易,这里有一些。

只是速度:

[Speed || ok, Speed <- [ch:get_speed(Pid, 1000) || Pid <-
    [Pid || undefined, Pid, worker, [ch] <-
        supervisor:which_children(ch_sup)
        ]
    ]].

Pid 和速度元组:

[Pid, Speed || Pid, ok, Speed <-
    [Pid, ch:get_speed(Pid, 1000) || Pid <-
        [Pid || undefined, Pid, worker, [ch] <-
                supervisor:which_children(ch_sup)]
        ]
    ].

所有结果,包括超时和在您到达之前死亡的孩子的“死亡”结果:

[Pid, Any || Pid, Any <-
    [Pid, ch:get_speed(Pid, 1000) || Pid <-
        [Pid || undefined, Pid, worker, [ch] <-
                supervisor:which_children(ch_sup)]
        ]
    ].

在大多数情况下,您几乎可以肯定除了速度之外不想要任何东西,因为您将如何处理死亡和超时?你希望那些死去的人由主管重生,所以问题在你知道的时候或多或少地得到解决,而超时,就像任何错误一样,是一个单独的问题,可以用你看到的任何方式处理适合...不过,无需将故障修复逻辑与数据检索逻辑混合在一起。

现在,我认为您在帖子中提到的所有这些问题,但我不太确定,1000 的超时是针对每个调用的,并且每个调用是一个接一个地同步的,因此对于 1000 个超时时间为 1 秒的孩子,可能需要 1000 秒才能产生任何结果。让时间超时 1ms 可能是答案,但要正确地做到这一点有点复杂:

get_speeds() ->
    ReceiverPid = self(),
    Ref = make_ref(),
    Pids = [Pid || undefined, Pid, worker, [ch] <-
            supervisor:which_children(ch_sup)],
    lists:foreach(
        fun(Pid) -> spawn(
            fun() -> ReceiverPid ! Ref, ch:get_speed(Pid, 1000) end
            ) end,
        Pids),
    receive_speeds(Ref, length(Pids), os_milliseconds(), 1000)
.

receive_speeds(_Ref, 0, _StartTime, _Timeout) ->
    [];
receive_speeds(Ref, Remaining, StartTime, Timeout) ->
    Time = os_milliseconds(),
    TimeLeft = Timeout - Time + StartTime,
    receive
        Ref, acc_timeout ->
            [];
        Ref, ok, Speed ->
            [Speed | receive_speeds(Ref, Remaining-1, StartTime, Timeout)];
        Ref, _ ->
            receive_speeds(Ref, Remaining-1, StartTime, Timeout)
    after TimeLeft ->
        []
    end
.

os_milliseconds() ->
    OsMegSecs, OsSecs, OsMilSecs = os:timestamp(),
    round(OsMegSecs*1000000 + OsSecs + OsMilSecs/1000)
.

这里每个调用都在不同的进程中产生并收集回复,直到“主超时”或全部收到。

代码大部分是从我的各种作品中剪切和粘贴的,并通过手动和搜索替换进行编辑,以匿名并删除多余的,所以它可能主要是可编译的质量,但我不保证我没有不要破坏任何东西。

【讨论】:

感谢您的精彩回答!你给每个孩子打电话。我认为对每个孩子进行异步调用并获得结果。 (或者如果使用 OTP 规范,则增加并行同步调用,就像你通过产生 新演员所做的那样(我害怕产生 1000 个孩子,你告诉产生 1000* :)))为什么我认为标准call 是个坏主意: 1. 迭代所有子进程并按顺序调用 - 速度很慢 2. 如果为每个同步调用生成演员 - 演员太多 也许,进行同步调用,调用结果 ok, pid 以及之后 - 发送速度,调用者获取所有 pid,添加监视以监视死亡。 我的解决方案可能是:我正在同步调用主管,他为我的 pid 保存孩子列表,发送给所有孩子 get_speed(MyPid) 并回答 ok, ChildsCount。孩子们把他的速度发给我。如果孩子死了并且他的 pid 在保存的列表中 - 主管给我发送 err, child_die 或一些东西,因为我可以计算答案的数量,他没有收到什么。但是bug: 1.主管什么时候可以删除已保存的列表? 2.如果我死了,孩子们如何理解需要停止发送速度? 我重新阅读了 OTP 文档并认为我的问题是“奇怪的”,并且需要更同步(并且更慢:)),但要遵循 OPT 原则。或更改我的应用程序的架构。我这里有一些代码(所有代码都很脏,现在没有 OTP,我尝试学习并重写它)如果有人对我出生的问题感兴趣,欢迎:github.com/kolko/erl_nf_collector我将你的回答标记为正确,因为它(并重新-阅读 OPT 文档)帮助我理解我的问题的奇怪设计。谢谢大家! 很高兴它对您有所帮助。也许为了完整起见,我应该包括最后一种情况,但因为您的问题是关于迁移到 OTP,这意味着您的永久和瞬时状态进程将是 gen_server/gen_fsm。当您不需要响应时,异步消息的真正威力就实现了,因此您可以触发并忘记;没有任何障碍,所以它非常有效。一旦您需要响应,您要么发送一条消息,然后等待接收响应,然后再继续(这就是 gen_server 调用为您所做的),或者您也可以异步发送消息和接收。 如果您想要 OTP '合规性',尽管您的服务器将是 gen_server 或 gen_fsm,并且您可以调用,因此无需重新发明它。对于许多需要来自所有服务器的响应的 gen_servers,在将这些响应用于某事之前,合乎逻辑的答案类似于我后面几段中的代码。

以上是关于Erlang,尝试制作 gen_server: 调用有很多响应的主要内容,如果未能解决你的问题,请参考以下文章

Erlang:无法创建 gen_server:call()

在 Erlang 的 gen_server 中实现代码交换

Erlang:gen_server 还是我自己的自定义服务器?

难以理解 Erlang Gen_Server 架构

何时在 Erlang/OTP 应用程序中使用 gen_server

具有长时间运行任务的 Erlang gen_server