disruptor
Posted v4ki5mqu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了disruptor相关的知识,希望对你有一定的参考价值。
核心组件:
RingBuffer:环形数组,存放生产者消费者使用的对象,对象复用。
Sequence:是disruptor对队列中表示位置的下标位置的抽象。用填充的方式解决伪共享问题。
Sequencer:是用来协调生产者进度和消费者进度的。消费者不能跑到生产者前面去了,生产者也不能超过消费者一圈。有3个重要的参数,cursor表示的生产者的位置,gatingSequences表示的是末端消费者的位置,waitstrategy表示当没有数据给消费者时,消费者的等待行为
SequenceBarrier:协调消费者的进度和它依赖的进度的。这里说依赖是因为消费者本身是有层级的,第一层的消费者依赖(不超过)生产者的进度,第二层的消费者依赖(不超过)第一层的消费进度。从构造方法可以看出当传入一个长度为0的dependentSequences数组时,该barrier的dependentSequence就是生产者的位置。如果大于0就用FixedSequenceGroup包装一下dependentSequences数组,FixedSequenceGroup的get方法返回的就是dependentSequences数组的最小值
源码解析:
SingleProducerSequencer.next 获取生产的可用下标
public long next(int n) {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
// 上一次的返回的seq
long nextValue = this.nextValue;
// 这一次应该返回的seq
long nextSequence = nextValue + n;
// 上一次这个位置的seq = 这一次的seq-bufferSize
// 目的是为了看这个seq的slot有没有被消费,能不能使用
long wrapPoint = nextSequence - bufferSize;
// 获取消费者未消费的元素的seq最小值,这个值不是实时的,因为这个需要遍历所有消费者获取最小的seq,代价较大
long cachedGatingSequence = this.cachedValue;
// wrapPoint > cachedGatingSequence,检查将要被放入元素的slot是否已经没有消费者占用了
// cachedGatingSequence > nextValue 消费进度超过生产进度 异常情况 序号被重置回老的的oldSequence值
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
long minSequence;
// 获取实时的消费最小seq
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
// 唤醒所有消费者
waitStrategy.signalAllWhenBlocking();
// park一下 自旋
LockSupport.parkNanos(1L);
}
// 将最小值缓存起来
this.cachedValue = minSequence;
}
// 更新nextValue 返回
this.nextValue = nextSequence;
return nextSequence;
}
MultiProducerSequencer.next获取生产的可用下标
public long next(int n) {
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do {
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
// 有多个生产者,都会操作gatingSequenceCache,
// gatingSequenceCache相当于SingleProducerSequencer里的cachedValue的Atomic版
// 保证线程安全和满足可见性
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence) {
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1);
continue;
}
gatingSequenceCache.set(gatingSequence);
}
// 用cas更新cursor
else if (cursor.compareAndSet(current, next)) {
break;
}
}
while (true);
return next;
}
SingleProducerSequencer.publish 推送数据
@Override
public void publish(long sequence){
cursor.set(sequence);//更新RingBuffer的游标
waitStrategy.signalAllWhenBlocking();//给消费者发送signal信号,具体怎么做与waitStrategy的实现有关
}
MultiProducerSequencer.publish 推送数据
@Override
public void publish(final long sequence) {
setAvailable(sequence);//这里用了一个额外的availableBuffer数组,来标记RingBuffer的某个slot是否已经被publish成功,后面生产者消费的时候会用到
waitStrategy.signalAllWhenBlocking();//给消费者发送signal信号,具体怎么做与waitStrategy的实现有关
}
单生产者publish一个seq,那么这个seq之前所有的seq都被publish了
多生产者publish一个seq,那么只有这一个seq被publish,之前的seq可能只是被某个生产者拿到了,发送动作并不一定完成了。多生产者是先更新cursor,然后发送,发送成功后setAvailable,这个seq的数据才能被消费者消费。如果A拿到14,B拿到15,B setAvailable(15),A没有set,此时消费者无法消费到数据15,但是cursor已经是15了。只有这个seq之前的数据全部setAvailable才能被消费。
单消费者启动 BatchEventProcessor.run()主要部分就是这个while,不断的去获取可消费的seq,然后调用eventHandler.onEvent处理数据
...........
while (true) {
try {
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
...........
多消费者启动 workProcessor.run()主要部分就是这个while,先cas抢到seq的处理权,然后去询问可处理的seq,如果seq大于cas抢到的,代表可以处理了,workHandler.onEvent(event)处理数据,然后再去cas抢夺seq的处理权
public void run() {
..............
while (true) {
try {
if (processedSequence) {
processedSequence = false;
do {
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);//保存自己当前的处理位置
} while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
//二阶段提交的核心逻辑,通过cas+自旋的方式获取消费者公有序列器+1的位置,
// 如果cas成功则说明拿到了对应位置事件的处理权
}
// 说明已经等到了自己需要的数据就绪
// 操作自己抢到锁的这个event
if (cachedAvailableSequence >= nextSequence) {
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);//执行业务逻辑handler
processedSequence = true;
} else {
//根据sequenceBarrier的等待策略等待自己要执行的序列,如果等待失败,会返回最新被写入的事件下标
//sequenceBarrier可能会抛出AlertException异常,这个异常会在调用了Processor停止方法后被抛出
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
...........
BatchEventProcessor: 重复消费处理器,使用同一个SequenceBarrier的一批BatchEventProcessor会对自己的sequence到SequenceBarrier.waitFor(sequence)数据都消费一遍。每个BatchEventProcessor都会消费一遍。对于一个实体来说,有多少个BatchEventProcessor就会被消费多少遍。
WorkerPool:非重复处理器,一个WorkerPool中的所有workHander共同使用一个sequence,对于sequence到SequenceBarrier.waitFor(sequence)只会消费一边,workHander之间抢夺。对于一个实体来说,只会被消费一遍。
ProcessingSequenceBarrier.waitFor()
public long waitFor()(final long sequence)throws AlertException, InterruptedException, TimeoutException {
checkAlert();
// 调用消费者的waitStrategy来等待sequence变得可用
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence){
return availableSequence;
}
// 从RingBuffer中找到最大的已经被publish事件的slot,寻找策略根据单生产者/多生产者有不同
return sequencer.getHighestPublishedSequence(sequence,availableSequence);
}
SingleProducerSequencer.getHighestPublishedSequence()//单生产者的情况,简单
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
return availableSequence;
}
MultiProducerSequencer.getHighestPublishedSequence()//多生产者
public long getHighestPublishedSequence(long lowerBound, long availableSequence){
for (long sequence = lowerBound; sequence <= availableSequence; sequence++){//从lowerBound开始遍历,在RingBuffer中找到最大的已经被publish事件的slot
if (!isAvailable(sequence)){
return sequence - 1;
}
}
return availableSequence;
}
对于SingleProducerSequencer由于是发布时更新cursor,因此可以直接返回availableSequence;对于MultiProducerSequencer是在availableBuffer的[lowerBound,availableSequence]区间上找到最小的已发布位置。
waitStrategy.waitFor 获取可消费的seq
BlockingWaitWaitStrategy的实现如下:
private final Lock lock = new ReentrantLock();//锁与关联的条件
private final Condition processorNotifyCondition = lock.newCondition();
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
if (cursorSequence.get() < sequence)//如果生产者还没有生产出足够的事件,那么在锁上等待
{
lock.lock();//尝试占有锁
try {
while (cursorSequence.get() < sequence) {
barrier.checkAlert();
processorNotifyCondition.await();//等待
}
} finally {
lock.unlock();//释放锁
}
}
// 如果自己的前置依赖没有消费 就不断自旋
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking() { // 这个在生产者push的时候会用到
lock.lock();
try {
processorNotifyCondition.signalAll();//唤醒所有在锁上等待的生产者
} finally {
lock.unlock();
}
}
以上是关于disruptor的主要内容,如果未能解决你的问题,请参考以下文章