LMAX Disruptor - 啥决定了批量大小?

Posted

技术标签:

【中文标题】LMAX Disruptor - 啥决定了批量大小?【英文标题】:LMAX Disruptor - what determines the batch size?LMAX Disruptor - 什么决定了批量大小? 【发布时间】:2016-02-16 10:56:55 【问题描述】:

我最近一直在学习 LMAX Disruptor 并进行了一些实验。令我困惑的一件事是EventHandleronEvent 处理程序方法的endOfBatch 参数。考虑我的以下代码。首先,我称之为Test1Test1Worker 的虚拟消息和消费者类:

public class Test1 



public class Test1Worker implements EventHandler<Test1>
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) 
        try
            Thread.sleep(500);
        
        catch(Exception e)
            e.printStackTrace();
        
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    

请注意,我设置了 500 毫秒的延迟,以替代一些现实世界的工作。我还在控制台中打印

的序列号

然后我的驱动程序类(充当生产者)称为DisruptorTest

public class DisruptorTest 

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args)             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);           
        bus1.handleEventsWith(new Test1Worker());
        RingBuffer<Test1> buf1 = bus1.start();

        for (int i = 0; i < 10; i++)
            long a = System.currentTimeMillis();
            long next = buf1.next();
            long b = System.currentTimeMillis();
            System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
            try 
                Test1 message = buf1.get(next);
             catch (Exception e) 
                e.printStackTrace();
             finally 
                buf1.publish(next);
            
        
    

    public static class Test1Factory implements EventFactory<Test1> 
        public Test1 newInstance() 
            return new Test1();
        

       

在这里,在初始化所需的东西之后,我向RingBuffer(缓冲区大小 8)提供了 10 条消息,并尝试监视一些事情 - 生产者在 RingBuffer 中声明下一个插槽的延迟以及消息及其在消费者端的序列号,以及特定序列是否被视为批处理结束。

现在,有趣的是,处理每条消息涉及 500 毫秒的延迟,这就是我得到的输出:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true

但是,如果我去掉 500 毫秒的等待时间,我会得到:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true  

所以看起来某条消息是否被认为是在批处理的末尾(即批处理的大小)受到消费者消息处理延迟的影响。可能我在这里很愚蠢,但这应该是这样吗?这背后的原因是什么?无论如何,什么决定了批量大小?提前致谢。如果我的问题中有任何不清楚的地方,请告诉我。

【问题讨论】:

如果你想了解更多细节,你可能应该看看 Aleksey Shipilev 的一些关于 JVM 基准测试的视频 - shipilev.net 【参考方案1】:

批量大小仅由可用元素的数量决定。因此,如果此时有更多可用元素,那么它将包含在批处理中。例如,如果 Disruptor 调用您的代码并且队列中只有一个元素,那么您将收到一个 endOfBatch=true 的调用。如果队列中有 8 个元素,那么它将收集所有 8 个元素并分批发送。

您可以在下面的代码中看到,队列中“可用”的条目数已被提取,并且可能比“下一个”条目多得多。所以比如你现在5岁,在等slot 6,然后有3个事件到达,available会是8个,你会批量收到多个调用(针对6,7,8)。

https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L124

final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)

    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;

关于元素 9 的 500ms 暂停,请注意 Disruptor 是用环形缓冲区构建的,并且您已将缓冲区中的插槽数指定为 8(请参阅此处的第二个参数):

bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);  

如果不是所有消费者都消费了一个元素,并且环形缓冲区已满(所有 8 个元素都已满),则将阻止生产者向缓冲区发布新事件。您可以尝试增加缓冲区大小,例如 200 万个对象,或者确保您的消费者比生产者更快,这样队列就不会填满(移除您已经演示过的睡眠)。

【讨论】:

感谢您的回答。现在我对批量大小有了一个清晰的认识。但是,现在我对制片人的等待时间感到困惑。请注意,在我的示例中(延迟为 500 毫秒),生产者在声明插槽 9(即模数后的插槽 2)之前等待了大约 3500 毫秒(500 * 7)。我想知道为什么生产者没有在消费者收到序列 2 后立即声明插槽 9 ?为什么要等到序列 7 被消耗掉?我知道这个问题有点偏离原问题的主题,但我还是很好奇。 @AsifIqbal,这就是 WaitStrategies 出现的地方。默认情况下(在您的代码中)有 BlockingWaitStrategy ,它的行为就是这样。您可以更改 WaitStrategy 以更改行为。

以上是关于LMAX Disruptor - 啥决定了批量大小?的主要内容,如果未能解决你的问题,请参考以下文章

从 LinkedBlockingQueue 迁移到 LMAX 的 Disruptor

Log4j2 - java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor

是否应该将 Disruptor (LMAX) 与内存和 CQRS 中的大模型一起使用?

LMAX Disruptor 如何解决典型的消息代理问题?

LMAX Disruptor 最简单实际的示例代码

监控 LMAX Disruptor