无法在 GenServer 中重现“启动/循环”行为
Posted
技术标签:
【中文标题】无法在 GenServer 中重现“启动/循环”行为【英文标题】:Can't reproduce "start/loop" behaviour in GenServer 【发布时间】:2018-01-04 02:59:01 【问题描述】:我正在尝试使用一个进程作为同步机制,它可以接收我们指定的消息,但仍然可以正常运行。我已经设法用简单的流程实现了我的问题的简化版本,但是我无法使用 GenServer 实现相同的效果。
简化版是这样的:
defmodule Fun do
def start_link do
spawn(fn -> loop(:initiated) end)
end
def loop(state) do
receive do
:join when state == :initiated ->
IO.inspect("Handling join")
loop(:initiated)
:finish when state == :initiated ->
IO.inspect("Finishing")
loop(:finishing)
:cleanup when state == :finishing ->
IO.inspect("Cleaning up...")
end
end
end
上面只会在state
为:initiated
时处理:join
和:finish
消息,只有在收到:finish
时才执行:cleanup
并退出。在这里,我试图利用卡在邮箱中的消息,直到它们可以匹配。
它是这样工作的:
iex(1)> pid = Fun.start_link
#PID<0.140.0>
iex(2)> send(pid, :join)
"Handling join"
:join
iex(3)> send(pid, :join)
"Handling join"
:join
iex(4)> send(pid, :cleanup) # No `IO.inspect` at this point
:cleanup
iex(5)> send(pid, :finish) # Two `IO.inspect`s once `:finish` received
"Finishing"
:finish
"Cleaning up..."
我试图用 GenServer 重现相同的行为:
defmodule FunServer do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
:ok, :initiated
end
def handle_info(:join, msg) when msg == :initiated do
IO.inspect("Handling join [From GenServer]")
:noreply, :initiated
end
def handle_info(:finish, msg) when msg == :initiated do
IO.inspect("Finishing [From GenServer]")
:noreply, :finishing
end
def handle_info(:cleanup, msg) when msg == :finishing do
IO.inspect("Cleaning up [From GenServer]")
:stop, :normal, msg
end
end
鉴于我将此 GenServer 配置为 :temporary
worker
我的应用程序,它的工作方式如下:
iex(1)> send(FunServer, :join)
"Handling join [From GenServer]"
:join
iex(2)> send(FunServer, :cleanup)
:cleanup
iex(3)>
07:11:17.383 [error] GenServer FunServer terminating
** (FunctionClauseError) no function clause matching in
FunServer.handle_info/2
(what_the_beam) lib/fun_server.ex:22: FunServer.handle_info(:cleanup, :initiated)
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :cleanup
State: :initiated
我尝试过使用handle_cast
回调,以及不同格式的参数,例如:
handle_info(:cleanup, :finishing)
或
handle_cast(:cleanup, :finishing)
而不是where msg == :finishing
,但它们都不适合我。
我正在使用 Elixir 1.5 和 Erlang 20。
【问题讨论】:
【参考方案1】:使用:gen_statem
(或Elixir 包装库,在十六进制上以gen_state_machine
找到)更容易实现您想要实现的目标。您可以使用“延迟”功能模拟选择性接收,该功能会将消息放回内部缓冲区,直到机器状态发生变化,此时延迟的消息将按照接收顺序进行处理。
还有其他一些不错的技巧,比如能够生成“内部”消息,这些消息在处理其他任何事情之前放置在邮箱的头部。由于您的示例是一个非常明确的 FSM 案例,因此我建议您走这条路,而不是在 GenServer 中重新发明它。
【讨论】:
感谢您的建议。我还没有玩过gen_state_machine
,但是我已经阅读了文档,它看起来是解决特定问题的好方法。我会在空闲的时候尝试在更多细节中增加它。暂时我将接受 ash 的回答,因为该回答详细说明了我遇到的问题。【参考方案2】:
您遇到的问题称为常规 erlang 进程的选择性接收。这是乔·阿姆斯特朗 (Joe Armstrong) 书中的一句话:
receive works as follows:
...
2. Take the first message in the mailbox and try to match it
against Pattern1, Pattern2, and so on. If the match succeeds,
the message is removed from the mailbox, and the expressions
following the pattern are evaluated.
3. If none of the patterns in the receive statement matches the
first message in the mailbox, then the first message is removed
from the mailbox and put into a “save queue.” The second message
in the mailbox is then tried. This procedure is repeated until a
matching message is found or until all the messages in the mail-
box have been examined.
4. If none of the messages in the mailbox matches, then the process
is suspended and will be rescheduled for execution the next time
a new message is put in the mailbox. Note that when a new message
arrives, the messages in the save queue are not rematched; only
the new message is matched.
5. As soon as a message has been matched, then all messages that
have been put into the save queue are reentered into the mailbox
in the order in which they arrived at the process. If a timer
was set, it is cleared.
6. If the timer elapses when we are waiting for a message, then
evaluate the expressions ExpressionsTimeout and put any saved
messages back into the mailbox in the order in which they
arrived at the process.
gen_server 不能以这种方式工作。它需要您的回调模块来匹配一条消息,或者当您发现由于 gen_server 实现找不到调度方式而引发错误时。如果您希望 gen_server 实现与上述接收逻辑相匹配,则必须手动执行。一种简单的方法是在某种列表中累积在给定状态下无法匹配的所有消息,然后在每次成功匹配后将它们重新发送到 self()。为此,您的状态不能再只是一个简单的原子,因为您需要自己组织已保存的队列。
顺便说一句,之前在 erlang 的上下文中也问过同样的问题。响应者的建议与我所描述的内容是一一对应的。因此,如果您需要具体代码,这里是该问题的链接:How do you do selective receives in gen_servers?
【讨论】:
感谢您提供如此详尽的答案。我不知道这种优化——我的印象是当新消息到达时整个队列都会被重新评估(这是我第一次成功尝试解决问题的基础)。你在这里引用的乔的书是什么?【参考方案3】:在您的原始代码中,当状态为 :initiated
时,您的 receive
将忽略 :cleanup
消息。这是一种不好的做法,可能会导致瓶颈,因为任何此类消息都将永远留在进程的收件箱中,耗尽内存并减慢未来的 receive
块,因为 receive
(通常)花费的时间与数量成正比进程收件箱中的消息。
GenServer
通过强制您按照收到的顺序处理消息来正确处理这种情况。为了忽略消息,您需要显式添加一个不执行任何操作的handle_info
。您可以简单地添加此子句,当状态为 :initiated
时将忽略 :cleanup
:
def handle_info(:cleanup, :initiated), do: :noreply, :initiated
您也可以通过在所有现有 handle_info
之后添加此子句来忽略任何其他消息:
def handle_info(_message, state), do: :noreply, state
我尝试过使用 handle_cast 回调,以及不同格式的参数,例如:
handle_info(:cleanup, :finishing)
...
如果您仔细阅读错误消息中的堆栈跟踪,问题是:gen_server
尝试调用FunServer.handle_info(:cleanup, :initiated)
但handle_info
中没有定义子句来处理它。 (:cleanup, :finishing)
没有问题。
【讨论】:
您好,感谢您的回复。我完全清楚如果未处理邮件将被放回邮箱,以及检查邮箱的工作原理。我知道,我的应用程序在它的生命周期内只收到这种:finishing
消息之一,然后它就死了。基于此,我开始围绕接收无序消息的问题来建模我的解决方案。这可以在“没有 GenServer”的情况下很好地解决,但希望在“GenServer”中复制它,因此它可以通过 OTP 和监督得到很好的回报。这是否意味着根本无法复制这种行为?
不会添加回退handle_info
子句(如答案所示)使其行为与您的原始代码完全相同?您的代码还会按顺序处理消息,只是它会根据状态忽略一些消息。
嗯,不完全是 - def handle_info(_message, state)
将丢弃该消息,而在我的第一个示例中,您可以看到 "Cleaning up..."
呈现 之后 "Finishing"
,即使你,@ 之前 :finish
已收到 987654343@ 消息。 :cleanup
只是礼貌地在邮箱中等待处理
哦,现在我明白了。你说得对。好吧,据我所知,您不能让 gen_server 保存这样的消息。如果我必须为此使用 gen_server,我会在状态中保留一个布尔值,用于存储 cleanup
是否已被调用,并适当地更改其他处理程序的逻辑。
感谢您为评估我的问题所做的努力!感谢您的最终确认,因为我不清楚 - 为什么我无法使用 GenServer 获得类似的结果。我会接受 ash 的回答,因为这篇回答详细说明了我遇到的问题。以上是关于无法在 GenServer 中重现“启动/循环”行为的主要内容,如果未能解决你的问题,请参考以下文章
在 Elixir ExUnit 中,我如何保证 Supervisor 将创建一个新的 GeNserver?