Erlang,尝试制作 gen_server: 调用有很多响应
Posted
技术标签:
【中文标题】Erlang,尝试制作 gen_server: 调用有很多响应【英文标题】:Erlang, try to make gen_server: call with many responses 【发布时间】:2016-01-13 22:04:26 【问题描述】:尝试在项目中使用 OTP-style 并得到一个 OTP-interface 问题。哪种方案更受欢迎/更美观?
我有什么:
-
带有
mochiweb
的网络服务器
一个进程,产生许多 (1000-2000) 个子进程。
儿童包含状态(netflow-speed)。如果需要,处理给孩子的代理消息并创建新的孩子。
在 mochiweb 中,我有一页显示所有演员的速度,乳清是如何制作的:
nf_collector ! get_abonents_speed, self(),
receive
abonents_speed_count, AbonentsCount ->
ok
end,
%% write http header, chunked
%% and while AbonentsCount != 0, receive speed and write http
这不是选择风格,我怎么能理解。解决方案:
-
在 API 同步函数中获取所有速度的请求并返回所有速度的列表。但我想马上写给客户。
API-function 的一个参数是回调:
nf_collector:get_all_speeds(fun (Speed) -> Resp:write_chunk(templater(Speed)) end)
返回迭代器:
get_all_speeds 的结果之一将与接收块一起使用。每次调用都会返回ok, Speed
,最后返回end
。
get_all_speeds() ->
nf_collector ! get_abonents_speed, self(),
receive
abonents_speed_count, AbonentsCount ->
ok
end,
ok, fun() ->
create_receive_fun(AbonentsCount)
end.
create_receive_fun(0)->
end;
create_receive_fun(Count)->
receive
abonent_speed, Speed ->
Speed
end,
ok, Speed, create_receive_fun(Count-1).
【问题讨论】:
真正的问题是什么?您是否在实施任何选项时遇到困难,如果是,您应该询问与此相关的问题。否则,这将主要是完全取决于您的用例的意见。 我同意 Adam 的观点,并且认为这更像是一个设计问题,但没有足够的信息提供任何建议。为什么有 1000-2000 个包含状态的孩子。为什么有进程返回计数并发出调用而不是返回那些孩子并让调用者决定做什么?读/写比率是多少?什么是非功能性需求,比如低延迟或吞吐量更重要?它是主要功能还是占系统其余部分的比例?等等。如果没有其他信息,这对我来说没有多大意义。 写这个问题的原因很简单:erlang 提供了编写基于actor的程序的简单方法,OTP 提供了标准化。首先,我编写了没有 OTP 的程序,理解程序逻辑很复杂。添加 OTP 后,它变得平坦而简单。在这里,我遇到了一种更复杂的行为,即同步/异步调用。我问路,如果遇到类似的问题,其他 erlang 开发人员会选择什么。或者这个问题是由糟糕的设计引起的,答案是 - 做简单,没有 1000 个(或更多)演员 尝试改写:如果我的电话与许多演员联系在一起,那么我必须处理许多情况:1. 当我的电话接收消息时,一个或多个演员死亡 2. 呼叫发起者死亡 3. 造成 2一次 -3 次以上的呼叫 4. 其他... 哪里有任何 OTP 行为、良好的编码标准或关于我如何做到这一点的建议?其他查看我的代码的 erlang 编码人员的回答并没有开始瞎忙 :) 如果不是,真的,我可以通过多种方式做到这一点,所以......我可以关闭这个问题 【参考方案1】:从主管那里生出你的“孩子”:
-module(ch_sup).
-behaviour(supervisor).
-export([start_link/0, init/1, start_child/1]).
start_link() -> supervisor:start_link(local, ?MODULE, ?MODULE, []).
init([]) -> ok, simple_one_for_one, [ch, ch, start_link, [], transient, 1000, worker, [ch]].
start_child(Data) -> supervisor:start_child(?MODULE, [Data]).
用 ch_sup:start_child/1 启动它们(数据是什么)。
将您的孩子实现为 gen_server:
-module(ch).
-behaviour(gen_server).
-record(?MODULE, speed).
...
get_speed(Pid, Timeout) ->
try
gen_server:call(Pid, get, Timeout)
catch
exit:timeout, _ -> timeout;
exit:noproc, _ -> died
end
.
...
handle_call(get, _From, St) -> reply, ok, St#?MODULE.speed, St end.
您现在可以使用主管获取正在运行的孩子的列表并查询他们,但您必须接受在获取孩子列表和调用他们之间有孩子死亡的可能性,显然孩子可能出于某种原因活着但没有响应,或响应错误等。
上面的 get_speed/2 函数返回 ok, Speed 或 dead 或 timeout。您可以根据自己的应用需求进行适当的过滤;通过列表理解很容易,这里有一些。
只是速度:
[Speed || ok, Speed <- [ch:get_speed(Pid, 1000) || Pid <-
[Pid || undefined, Pid, worker, [ch] <-
supervisor:which_children(ch_sup)
]
]].
Pid 和速度元组:
[Pid, Speed || Pid, ok, Speed <-
[Pid, ch:get_speed(Pid, 1000) || Pid <-
[Pid || undefined, Pid, worker, [ch] <-
supervisor:which_children(ch_sup)]
]
].
所有结果,包括超时和在您到达之前死亡的孩子的“死亡”结果:
[Pid, Any || Pid, Any <-
[Pid, ch:get_speed(Pid, 1000) || Pid <-
[Pid || undefined, Pid, worker, [ch] <-
supervisor:which_children(ch_sup)]
]
].
在大多数情况下,您几乎可以肯定除了速度之外不想要任何东西,因为您将如何处理死亡和超时?你希望那些死去的人由主管重生,所以问题在你知道的时候或多或少地得到解决,而超时,就像任何错误一样,是一个单独的问题,可以用你看到的任何方式处理适合...不过,无需将故障修复逻辑与数据检索逻辑混合在一起。
现在,我认为您在帖子中提到的所有这些问题,但我不太确定,1000 的超时是针对每个调用的,并且每个调用是一个接一个地同步的,因此对于 1000 个超时时间为 1 秒的孩子,可能需要 1000 秒才能产生任何结果。让时间超时 1ms 可能是答案,但要正确地做到这一点有点复杂:
get_speeds() ->
ReceiverPid = self(),
Ref = make_ref(),
Pids = [Pid || undefined, Pid, worker, [ch] <-
supervisor:which_children(ch_sup)],
lists:foreach(
fun(Pid) -> spawn(
fun() -> ReceiverPid ! Ref, ch:get_speed(Pid, 1000) end
) end,
Pids),
receive_speeds(Ref, length(Pids), os_milliseconds(), 1000)
.
receive_speeds(_Ref, 0, _StartTime, _Timeout) ->
[];
receive_speeds(Ref, Remaining, StartTime, Timeout) ->
Time = os_milliseconds(),
TimeLeft = Timeout - Time + StartTime,
receive
Ref, acc_timeout ->
[];
Ref, ok, Speed ->
[Speed | receive_speeds(Ref, Remaining-1, StartTime, Timeout)];
Ref, _ ->
receive_speeds(Ref, Remaining-1, StartTime, Timeout)
after TimeLeft ->
[]
end
.
os_milliseconds() ->
OsMegSecs, OsSecs, OsMilSecs = os:timestamp(),
round(OsMegSecs*1000000 + OsSecs + OsMilSecs/1000)
.
这里每个调用都在不同的进程中产生并收集回复,直到“主超时”或全部收到。
代码大部分是从我的各种作品中剪切和粘贴的,并通过手动和搜索替换进行编辑,以匿名并删除多余的,所以它可能主要是可编译的质量,但我不保证我没有不要破坏任何东西。
【讨论】:
感谢您的精彩回答!你给每个孩子打电话。我认为对每个孩子进行异步调用并获得结果。 (或者如果使用 OTP 规范,则增加并行同步调用,就像你通过产生以上是关于Erlang,尝试制作 gen_server: 调用有很多响应的主要内容,如果未能解决你的问题,请参考以下文章
Erlang:gen_server 还是我自己的自定义服务器?