LMAX Disruptor 依赖图/带有 SequenceBarrier 的门控

Posted

技术标签:

【中文标题】LMAX Disruptor 依赖图/带有 SequenceBarrier 的门控【英文标题】:LMAX Disruptor Dependency Graph/Gating with SequenceBarrier 【发布时间】:2017-09-30 08:41:59 【问题描述】:

目标

我正在尝试在处理程序之间创建一种有点循环的依赖关系,但我不知道如何正确处理它。我想要实现的是producer -> [handlers 1-3] -> handler 4 的变体。

所以,disruptor.handleEventsWith(h1, h2, h3).then(h4);。但我有额外的要求,

    虽然处理程序 1-3 确实并行处理消息,但它们都不会开始处理下一条消息,直到它们都完成了上一条消息。 在第一条消息之后,处理程序 1-3 等待处理程序 4 完成最新消息,然后再处理下一条消息。

使用单个事件处理程序的等效执行逻辑可能是:

disruptor.handleEventsWith((event, sequence, endOfBatch) -> 
  Arrays.asList(h1, h2, h3).parallelStream()
        .forEach(h -> h.onEvent(event, sequence, endOfBatch));
  h4.onEvent(event, sequence, endOfBatch);
);

上下文

设计上下文是,处理程序 1-3 每个都根据消息更新自己的状态,并且在三个处理程序处理完消息后,它们处于一致状态。处理程序 4 然后根据处理程序 1-3 更新的状态运行一些逻辑。所以处理程序 4 应该只看到由 1-3 维护的数据结构的一致状态,这意味着处理程序 1-3 不应该在处理程序 4 完成之前处理下一条消息。

(虽然目标肯定是使用 Disruptor 来管理并发,而不是java.util.Stream。)

不确定它是否重要,但处理程序 4 的逻辑也可以分为两部分,一个要求不更新处理程序 1-3,下一个只要求处理程序 4 的第一部分具有完成的。因此,处理程序 1-3 可以在处理程序 4 的第二部分仍在执行时处理消息。

有没有办法做到这一点?还是我的设计有缺陷?我觉得应该有办法通过SequenceBarrier 做到这一点,但我不太明白如何实现这个自定义屏障。对于处理程序 1-3,我想我想用逻辑 handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence() 设置障碍,但我不确定该逻辑放在哪里。

谢谢!

【问题讨论】:

【参考方案1】:

我会考虑让处理程序是无状态的,并使用它们处理的消息来包含系统的状态。这样您就根本不需要同步处理程序。

【讨论】:

以上是关于LMAX Disruptor 依赖图/带有 SequenceBarrier 的门控的主要内容,如果未能解决你的问题,请参考以下文章

Disruptor极速队列

从 LinkedBlockingQueue 迁移到 LMAX 的 Disruptor

disruptor 单生产者多消费者

LMAX Disruptor 最简单实际的示例代码

LMAX Disruptor Timeout EventHandler

监控 LMAX Disruptor