你能溢出 Erlang 进程的消息队列吗?

Posted

技术标签:

【中文标题】你能溢出 Erlang 进程的消息队列吗?【英文标题】:Could you overflow the message queue of an Erlang process? 【发布时间】:2015-02-16 00:27:42 【问题描述】:

我还在学习 Erlang,所以我可能错了,但这就是我对进程消息队列的理解。

一个进程可能在它的主接收循环中,接收某些类型的消息,而稍后它可能被置于一个等待循环中,以在第二个循环中处理不同类型的消息。如果该进程将在第二个循环中接收用于第一个循环的消息,它会将它们放入队列中,暂时忽略它们,并且只处理它可以在当前循环中匹配的那些消息。现在如果它再次进入第一个接收循环,它将从头开始,并再次处理它可以匹配的消息。

现在我的问题是,如果这就是 Erlang 的工作方式并且我理解正确,那么当恶意进程发送一堆该进程永远不会处理的消息时会发生什么。队列最终会溢出,导致进程崩溃或者我应该如何处理?我将打出一个例子来说明我的意思。

现在,如果恶意程序获取了 Pid 并重复发送Pid ! malicioudata, LotsOfData,这些消息是否会被过滤掉,因为它们永远不会被处理,或者它们只会堆积在队列中?

startproc() -> firstloop(InitValues).

firstloop(Values) ->
  receive
    retrieveinformation ->
      WaitingList=askforinformation(),
      retrieveloop(WaitingList);
    dostuff ->
      NewValues=doingstuff(),
      firstloop(NewValues);
    sendmeyourdata ->
      sendingdata(Values),
      firstloop(Values)
  end.

retrieveloop([],Values) -> firstloop(Values).
retrieveloop(WaitingList,Values) ->
  receive
    hereismyinformation,Id,MyInfo ->
      NewValues=dosomethingwithinfo(Id,MyInfo),
      retrieveloop(lists:remove(Id,1,WaitingList),NewValues);
  end.

【问题讨论】:

我猜在最好的情况下你会耗尽内存。但我的问题是; “恶意程序”如何与 erlang 的 PID 交互,攻击者是否嗅探了您节点的 cookie?在这种情况下,队列溢出是可能发生的最不坏的事情。在内存泄漏的情况下,如果 erlang 因内存问题而崩溃,那么您可以读取生成的崩溃转储。 @Evalon 在 vanilla 案例中,Erlang 被设计为在受信任的环境中运行。还有其他集群方法不做这个假设,但它们是附加组件,而不是普通的 Erlang 运行时配置。假设是您直接控制您正在操作的集群。在您认为“啊,这太糟糕了”之前,请意识到 在 Erlang 中进行套接字编程真的很容易。所以几乎任何你可以想象的微服务元服务都可以在 Erlang 中相对轻松地构建,构建集群的集群。 【参考方案1】:

消息数量没有硬性限制,也没有固定的内存限制,但是如果你有数十亿条消息(或者一些超级大的消息,也许)。

在您因为一个巨大的邮箱而 OOM 之前很久,您会注意到 selective receives taking a long time(并不是说“选择性接收”是大部分时间遵循的好模式......)或无辜地窥视进程邮件队列并 @ 987654322@.

这在 Erlang 世界中通常被视为一个节流和监控问题。如果你无法跟上并且你的问题是可并行的,那么你需要更多的工人。如果您要最大限度地利用硬件,那么您需要更高的效率。如果您仍在最大限度地使用您的硬件,无法再获得更多,并且您仍然不知所措,那么您需要决定如何实施推回或减载。

【讨论】:

@Evalon 对该主题的精彩讨论。并用友好的图片将讨论分解到我的水平! :-)【参考方案2】:

不幸的是,没有“消息队列溢出”,并且它会不断增长,直到 VM 由于内存分配错误而崩溃。

解决方案是在主循环中删除任何无效消息,因为由于进程的阻塞性质,您不会收到hereismyinformation, _,_askforinformation() 中的任何消息。

startproc() -> firstloop(InitValues).

firstloop(Values) ->
  receive
    retrieveinformation ->
      WaitingList=askforinformation(),
      retrieveloop(WaitingList, Values); % i assume you meant that
    dostuff ->
      NewValues=doingstuff(),
      firstloop(NewValues);
    sendmeyourdata ->
      sendingdata(Values),
      firstloop(Values);
    _ -> 
      firstloop(Values) % you can't get hereismyinformation, _,_ here so we can drop any invalid message
  end.

retrieveloop([],Values) -> firstloop(Values).
retrieveloop(WaitingList,Values) ->
  receive
    hereismyinformation,Id,MyInfo ->
      NewValues=dosomethingwithinfo(Id,MyInfo),
      retrieveloop(lists:remove(Id,1,WaitingList),NewValues);
  end.

意外消息并不是真正的问题,因为它很容易避免,但是当进程队列的增长速度超过处理速度时。对于这个特定的问题,有一个很好的jobs framework 用于生产系统。

【讨论】:

问题是,当接收到'retrieveinformation'时,它应该询问信息,更新自己的值,然后才重新开始处理其他消息。因此,如果有人询问进程的值,进程将首先完成更新,并且只有在完成后才发送新值。那么是不是应该加上“RequestingStateList”的第三个参数,然后当状态更新时,我会迭代那个列表,发送当前状态什么的? @PeterRaeves 我没明白。我确实编辑了我的答案。如果您想像以前那样阻止它并丢弃意外消息,则只需在代码中添加一行。只是想你也希望它是异步的。 嗯。就是这么简单啊……这确实是我需要的。你是对的,我不会期望hereismyinformation 消息在第一个循环中,如果它们被发送,由于莫名其妙的原因,那么它们可以安全地被丢弃。谢谢! @PeterRaeves 没问题,没那么难猜 ;)

以上是关于你能溢出 Erlang 进程的消息队列吗?的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 消息队列

消息队列经典十连问,你能扛到第几问?

Erlang进程发送消息

消息队列:我们可以在消息进入队列时触发事件吗?

RabbitMQ消息队列集群

RabbitMQ消息队列集群配置