Java 无锁队列 Disruptor,内存队列的生产解决方案

Posted BBinChina

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 无锁队列 Disruptor,内存队列的生产解决方案相关的知识,希望对你有一定的参考价值。

背景

Disruptor是英国外汇交易所LMAX开源的用于生产交易中的内存队列。

为了实现高性能交易撮合队列时,现在普遍的交易撮合引擎都采用了内存队列的方式,这种方式减少了持久化过程中带来的磁盘IO延迟,可以提交整体的交易性能。

Disruptor便是这样场景中诞生的,在实际使用过程中,LMAX基于Disruptor开发的系统单线程能支撑每秒600万订单。

Java队列

内存队列通常用于内存共享场景下的,而共享内存与锁有必然分不开的情缘,在java内置的线程安全的队列有以下:

队列有界性数据结构
ArrayBlockingQueuebounded加锁arraylist
LinkedBlockingQueueoptionally-bounded加锁linkedlist
ConcurrentLinkedQueueunbounded无锁linkedlist
LinkedTransferQueueunbounded无锁linkedlist
PriorityBlockingQueueunbounded加锁heap
DelayQueueunbounded加锁heap

ConcurrentLinkedQueue以及LinkedTransferQueue的无锁都是通过原子变量compare and swap(以下简称“CAS”)操作的。

为什么要无锁

现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。

Disruptor论文中讲述了一个实验:

这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
机器环境:2.4G 6核
运算: 64位的计数器累加5亿次
其测试结果:

MethodTime (ms)
Single thread300
Single thread with CAS5,700
Single thread with lock10,000
Single thread with volatile write4,700
Two threads with CAS30,000
Two threads with lock224,000

实际开发过的应该也知道这个结论:不加锁是最快的。

虽然知道无锁最快,但是我们核心还是要解决地是共享数据多线程安全访问呀(单线程能处理那么单线程最好了,不如actor模型,每个actor就是一个独占线程,反正永不停歇,那就没必要线程切换,保证效能)。

下面是ArrayBlockingQueue通过加锁的方式实现的offer方法,保证线程安全。

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

下面是AtomicInteger的getAndAdd方法。CAS是CPU的一个指令,由CPU保证原子性。

CAS

/**
 * Atomically adds the given value to the current value.
 *
 * @param delta the value to add
 * @return the previous value
 */
public final int getAndAdd(int delta) {
    for (;;) {
        int current = get();
        int next = current + delta;
        if (compareAndSet(current, next))
            return current;
    }
}
  
/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 *
 * @param expect the expected value
 * @param update the new value
 * @return true if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
} 

Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

  1. 环形数组结构
    为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
  2. 元素位置定位
    数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
  3. 无锁设计
    每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

Demo

package com.meituan.Disruptor;

/**
 * @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
 */
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;


public class DisruptorMain
{
    public static void main(String[] args) throws Exception
    {
        // 队列中的元素
        class Element {

            private int value;

            public int get(){
                return value;
            }

            public void set(int value){
                this.value= value;
            }

        }

        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory(){
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        // 处理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>(){
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch)
            {
                System.out.println("Element: " + element.get());
            }
        };

        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 16;

        // 创建disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 设置EventHandler
        disruptor.handleEventsWith(handler);

        // 启动disruptor的线程
        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; true; l++)
        {
            // 获取下一个可用位置的下标
            long sequence = ringBuffer.next();  
            try
            {
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence); 
                // 设置该位置元素的值
                event.set(l); 
            }
            finally
            {
                ringBuffer.publish(sequence);
            }
            Thread.sleep(10);
        }
    }
}

以上是关于Java 无锁队列 Disruptor,内存队列的生产解决方案的主要内容,如果未能解决你的问题,请参考以下文章

高性能队列Disruptor使用入门,原理和代码实现

深入理解Disruptor

深入理解Disruptor

高性能无锁并发框架Disruptor,太强了

聊一聊disruptor-无锁并发框架

高性能无锁队列,代码注释