Apache Storm 工作人员之间的高消息传递延迟

Posted

技术标签:

【中文标题】Apache Storm 工作人员之间的高消息传递延迟【英文标题】:Apache Storm High Messaging Delay between Workers 【发布时间】:2018-01-24 02:00:25 【问题描述】:

我正在使用 Apache Storm 开发一个需要超低延迟的实时图像处理应用程序。在拓扑定义中,单个 spout 将在每 1 秒内发出原始图像(5MB),并且一些螺栓将处理它们。每个bolt的处理延迟是可以接受的,整体计算延迟可以在150ms左右。

但是,我发现不同节点上的工作人员之间的消息传递延迟确实很高。 5 个连续螺栓的总体延迟约为 200 毫秒。 为了计算此延迟,我从端到端延迟中减去所有任务延迟。此外,我实现了一个定时器螺栓,其他处理螺栓将在这个定时器螺栓中注册,以在开始真正的处理之前记录时间戳。通过比较螺栓的时间戳,我发现每个螺栓之间的延迟很高,正如我之前注意到的那样。

为了分析这种高附加延迟的来源,我首先将发送间隔减少到 1 秒,因此不应存在由于高计算开销而导致的排队延迟。此外,从 Storm UI 中,我发现没有一个螺栓处于高 CPU 利用率。

然后,我检查了网络延迟。我正在使用 1Gbps 网络测试平台,并通过 RTT 和带宽测试网络。发送 5MB 图像的网络延迟不应该那么高。

最后,我在考虑缓冲延迟。我发现每个线程都维护自己的发送缓冲区并将数据传输到工作线程的发送缓冲区。我不确定接收器螺栓需要多长时间才能收到此发送消息。根据社区的建议,我将发送方/接收方缓冲区大小增加到 16384,将 STORM_NETTY_MESSAGE_BATCH_SIZE 修改为 32768。但是,它没有帮助。

我的问题是如何去除/减少bolts之间的消息传递开销?(inter worker)可以同步bolts之间的通信并让接收者立即收到发送消息而没有任何延迟?

【问题讨论】:

消息中是否包含图像,或者它们只是指向图像的指针? 是的,消息中包含整张图片。 【参考方案1】:

对于低延迟,您可能需要调整 netty 缓冲区和传输批量大小。由于当前工作人员的消息传递和线程模型,这种延迟可能是固有的。

还可以尝试调整破坏者配置:

    topology.disruptor.wait.timeout.millis topology.disruptor.batch.size topology.disruptor.batch.timeout.millis

也就是说,社区正在努力通过重新设计消息传递子系统来改善延迟和吞吐量。见https://github.com/apache/storm/pull/2502

【讨论】:

感谢 Arunmahadevan 的回复。通过消息传递代码,我发现 topology.disruptor.batch.size 是触发刷新队列的阈值。值是 50,我认为足够小。此外,topology.disruptor.batch.timeout.millis 是中断器检查刷新条件的时间间隔。默认值为 1ms,这是 Storm 接受的最小值。除了您提到的值之外,我还试图了解与 netty 相关的参数并了解如何调整它们。很高兴看到社区将改进消息传递的实施。【参考方案2】:

通过在 Storm 的源代码中插入时间戳的详细基准测试,我发现在传递两个 1440x1080 图像时,“序列化”步骤最多需要 30 毫秒。如果我纯粹将一个字节数组传递给一个元组,我认为这一步可以删除,从而减少延迟......

【讨论】:

【参考方案3】:

根据您上面的评论,您在每条消息中包含大约 5MB 的图像。

我不太了解kafka/storm,但我的理解是它是一个主流的消息代理。此类系统并非旨在处理大型有效负载,主要是因为它们提供了有关交付和持久性的保证,这两者都需要某些处理步骤来缓冲字节流,在大多数情况下会多次缓冲。这会导致您的延迟随着大小的增加而出现大于线性的时间增长。

我的建议是将图像存储在 Couchbase 或 Memcached 之类的快速文件中,然后发送包含指向它的指针的消息。这样的设置在一天之内启动并运行起来并不难。

【讨论】:

您好 theMayer,感谢您的建议。我担心的是,Storm 上传递的图像将在每一步都被修改,而不是固定的源。因此,它还需要每次更新缓存内容或将其保存到数据库中,然后获取更新的图像。本质上它必须从远程服务器传输图像? 那你为什么要谈论网络传输时间呢?

以上是关于Apache Storm 工作人员之间的高消息传递延迟的主要内容,如果未能解决你的问题,请参考以下文章

Apache - Storm

课程预告:大数据实时处理系统Apache Storm

Storm序列化

Storm通信机制

STORM_0010_Message passing implementation/消息传递的实现

Apache Storm java.nio.channels.ClosedChannelException: null