disruptor

Posted v4ki5mqu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了disruptor相关的知识,希望对你有一定的参考价值。

核心组件:

RingBuffer:环形数组,存放生产者消费者使用的对象,对象复用。
Sequence:是disruptor对队列中表示位置的下标位置的抽象。用填充的方式解决伪共享问题。
Sequencer:是用来协调生产者进度和消费者进度的。消费者不能跑到生产者前面去了,生产者也不能超过消费者一圈。有3个重要的参数,cursor表示的生产者的位置,gatingSequences表示的是末端消费者的位置,waitstrategy表示当没有数据给消费者时,消费者的等待行为
SequenceBarrier:协调消费者的进度和它依赖的进度的。这里说依赖是因为消费者本身是有层级的,第一层的消费者依赖(不超过)生产者的进度,第二层的消费者依赖(不超过)第一层的消费进度。从构造方法可以看出当传入一个长度为0的dependentSequences数组时,该barrier的dependentSequence就是生产者的位置。如果大于0就用FixedSequenceGroup包装一下dependentSequences数组,FixedSequenceGroup的get方法返回的就是dependentSequences数组的最小值

源码解析:
SingleProducerSequencer.next 获取生产的可用下标

public long next(int n) {
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }
    // 上一次的返回的seq
    long nextValue = this.nextValue;
    // 这一次应该返回的seq
    long nextSequence = nextValue + n;
    // 上一次这个位置的seq = 这一次的seq-bufferSize
    // 目的是为了看这个seq的slot有没有被消费,能不能使用
    long wrapPoint = nextSequence - bufferSize;
    // 获取消费者未消费的元素的seq最小值,这个值不是实时的,因为这个需要遍历所有消费者获取最小的seq,代价较大
    long cachedGatingSequence = this.cachedValue;
    // wrapPoint > cachedGatingSequence,检查将要被放入元素的slot是否已经没有消费者占用了
    // cachedGatingSequence > nextValue 消费进度超过生产进度 异常情况 序号被重置回老的的oldSequence值
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence;
        // 获取实时的消费最小seq
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
        {
            // 唤醒所有消费者
            waitStrategy.signalAllWhenBlocking();
            // park一下 自旋
            LockSupport.parkNanos(1L); 
        }
        // 将最小值缓存起来
        this.cachedValue = minSequence;
    }
    // 更新nextValue 返回
    this.nextValue = nextSequence;
    return nextSequence;
}

MultiProducerSequencer.next获取生产的可用下标

    public long next(int n) {
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    do {
        current = cursor.get();
        next = current + n;

        long wrapPoint = next - bufferSize;
        // 有多个生产者,都会操作gatingSequenceCache,
        // gatingSequenceCache相当于SingleProducerSequencer里的cachedValue的Atomic版
        // 保证线程安全和满足可见性
        long cachedGatingSequence = gatingSequenceCache.get();

        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

            if (wrapPoint > gatingSequence) {
                waitStrategy.signalAllWhenBlocking();
                LockSupport.parkNanos(1);
                continue;
            }

            gatingSequenceCache.set(gatingSequence);
        }
        // 用cas更新cursor
        else if (cursor.compareAndSet(current, next)) {
            break;
        }
    }
    while (true);

    return next;
}

SingleProducerSequencer.publish 推送数据

@Override
public void publish(long sequence){
    cursor.set(sequence);//更新RingBuffer的游标
    waitStrategy.signalAllWhenBlocking();//给消费者发送signal信号,具体怎么做与waitStrategy的实现有关
}

MultiProducerSequencer.publish 推送数据

@Override
public void publish(final long sequence) {
    setAvailable(sequence);//这里用了一个额外的availableBuffer数组,来标记RingBuffer的某个slot是否已经被publish成功,后面生产者消费的时候会用到
    waitStrategy.signalAllWhenBlocking();//给消费者发送signal信号,具体怎么做与waitStrategy的实现有关
}
单生产者publish一个seq,那么这个seq之前所有的seq都被publish了

多生产者publish一个seq,那么只有这一个seq被publish,之前的seq可能只是被某个生产者拿到了,发送动作并不一定完成了。多生产者是先更新cursor,然后发送,发送成功后setAvailable,这个seq的数据才能被消费者消费。如果A拿到14,B拿到15,B setAvailable(15),A没有set,此时消费者无法消费到数据15,但是cursor已经是15了。只有这个seq之前的数据全部setAvailable才能被消费。

单消费者启动 BatchEventProcessor.run()主要部分就是这个while,不断的去获取可消费的seq,然后调用eventHandler.onEvent处理数据

    ...........
     while (true) {
            try {
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                while (nextSequence <= availableSequence) {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }
                sequence.set(availableSequence);
            } 
    ...........
    

多消费者启动 workProcessor.run()主要部分就是这个while,先cas抢到seq的处理权,然后去询问可处理的seq,如果seq大于cas抢到的,代表可以处理了,workHandler.onEvent(event)处理数据,然后再去cas抢夺seq的处理权

public void run() {
    ..............
    while (true) {
        try {
            if (processedSequence) {
                processedSequence = false;
                do {
                    nextSequence = workSequence.get() + 1L;
                    sequence.set(nextSequence - 1L);//保存自己当前的处理位置
                } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                //二阶段提交的核心逻辑,通过cas+自旋的方式获取消费者公有序列器+1的位置,
                // 如果cas成功则说明拿到了对应位置事件的处理权
            }
            // 说明已经等到了自己需要的数据就绪
            // 操作自己抢到锁的这个event
            if (cachedAvailableSequence >= nextSequence) {
                event = ringBuffer.get(nextSequence);
                workHandler.onEvent(event);//执行业务逻辑handler
                processedSequence = true;
            } else {
                //根据sequenceBarrier的等待策略等待自己要执行的序列,如果等待失败,会返回最新被写入的事件下标
                //sequenceBarrier可能会抛出AlertException异常,这个异常会在调用了Processor停止方法后被抛出
                cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
            }
...........

BatchEventProcessor: 重复消费处理器,使用同一个SequenceBarrier的一批BatchEventProcessor会对自己的sequence到SequenceBarrier.waitFor(sequence)数据都消费一遍。每个BatchEventProcessor都会消费一遍。对于一个实体来说,有多少个BatchEventProcessor就会被消费多少遍。
WorkerPool:非重复处理器,一个WorkerPool中的所有workHander共同使用一个sequence,对于sequence到SequenceBarrier.waitFor(sequence)只会消费一边,workHander之间抢夺。对于一个实体来说,只会被消费一遍。

ProcessingSequenceBarrier.waitFor()

public long waitFor()(final long sequence)throws AlertException, InterruptedException, TimeoutException {
    checkAlert();
    // 调用消费者的waitStrategy来等待sequence变得可用
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

    if (availableSequence < sequence){
        return availableSequence;
    }
    // 从RingBuffer中找到最大的已经被publish事件的slot,寻找策略根据单生产者/多生产者有不同
    return sequencer.getHighestPublishedSequence(sequence,availableSequence);
}

SingleProducerSequencer.getHighestPublishedSequence()//单生产者的情况,简单

public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
    return availableSequence;
}

MultiProducerSequencer.getHighestPublishedSequence()//多生产者

public long getHighestPublishedSequence(long lowerBound, long availableSequence){
    for (long sequence = lowerBound; sequence <= availableSequence; sequence++){//从lowerBound开始遍历,在RingBuffer中找到最大的已经被publish事件的slot
        if (!isAvailable(sequence)){
            return sequence - 1;
        }
    }
    return availableSequence;
}

对于SingleProducerSequencer由于是发布时更新cursor,因此可以直接返回availableSequence;对于MultiProducerSequencer是在availableBuffer的[lowerBound,availableSequence]区间上找到最小的已发布位置。

waitStrategy.waitFor 获取可消费的seq
BlockingWaitWaitStrategy的实现如下:

private final Lock lock = new ReentrantLock();//锁与关联的条件
private final Condition processorNotifyCondition = lock.newCondition();

@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException {
    long availableSequence;
    if (cursorSequence.get() < sequence)//如果生产者还没有生产出足够的事件,那么在锁上等待
    {
        lock.lock();//尝试占有锁
        try {
            while (cursorSequence.get() < sequence) {
                barrier.checkAlert();
                processorNotifyCondition.await();//等待
            }
        } finally {
            lock.unlock();//释放锁
        }
    }
    // 如果自己的前置依赖没有消费 就不断自旋
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
    }

    return availableSequence;
}

@Override
public void signalAllWhenBlocking() { // 这个在生产者push的时候会用到
    lock.lock();
    try {
        processorNotifyCondition.signalAll();//唤醒所有在锁上等待的生产者
    } finally {
        lock.unlock();
    }
}

以上是关于disruptor的主要内容,如果未能解决你的问题,请参考以下文章

disruptor的并行用法

如何优雅地使用Disruptor

为Disruptor 写的一个简单实用的.Net扩展

从 LinkedBlockingQueue 迁移到 LMAX 的 Disruptor

disruptor架构三 使用场景更加复杂的场景

disruptor笔记之四:事件消费知识点小结