Disruptor 使用简介

Posted catlkb

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Disruptor 使用简介相关的知识,希望对你有一定的参考价值。

【开发总结】Disruptor 使用简介

在极客时间看到王宝令老师关于 Disruptor 的一篇文章,觉得很有意思。看完之后又在网上找到一些其他关于Disruptor 的资料看了一下。

现在写篇文章总结一下。

使用

Disruptor 百度翻译是干扰者,分裂器的意思。
在这里它其实是一个高性能队列,一个queue。所以我有点想不通为什么名字取成这样。有清楚的同学可以知会我一生。

Disruptor 的使用相对Java集合类中的队列,会更加复杂。

第一步,引入jar包.

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

第二步,生成 Disruptor 对象

第三步,设置队列中消息消费的handler.

第四步,启动 Disruptor 线程。

第五步,获取ringbuffer。生产者通过向 Disruptor 的ringbuffer 来发布消息的。所以事先要先获取ringbuffer。

第六步,发布消息。

/**
 * @description:
 * @author: lkb
 * @create: 2020-10-28 19:46
 */
@Slf4j
public class MyTest {


    public static void main(String[] args) {
        //指定RingBuffer大小,
        //必须是2的N次方
        int bufferSize = 1024;

        //构建Disruptor
        Disruptor<LongEvent> disruptor
                = new Disruptor<>(
                LongEvent::new,
                bufferSize,
                DaemonThreadFactory.INSTANCE);

        //注册事件处理器
        disruptor.handleEventsWith(
                (event, sequence, endOfBatch) ->
                        System.out.println("E: " + event));

        //启动Disruptor
        disruptor.start();

        //获取RingBuffer
        RingBuffer<LongEvent> ringBuffer
                = disruptor.getRingBuffer();
        //生产Event
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) {
            bb.putLong(0, l);
            //生产者生产消息
            ringBuffer.publishEvent(
                    (event, sequence, buffer) ->
                            event.set(buffer.getLong(0)), bb);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.error(e.getMessage(), e);
            }
        }
    }
}

其中LongEvent 是一个普通的POJO对象

public class LongEvent {
    private long value;
    public void set(long value) {
        this.value = value;
    }
}

Disruptor 的使用是典型的生产者-消费者模式。

Java集合中的队列更符合我们对队列的操作习惯。以ArrayBlockingQueue为例,我们可以把ArrayBlockingQueue想象为一个队列管道,生产者线程生产完数据后,将数据丢到队列中,消费者线程从另外一端取出数据,进行消费。

而Disruptor 相对其他传统的队列而言更像一个“大家长”,生成者需要通过这位“大家长”的ringbuffer将消息发送出去,消费者需要将处理操作注册到“大家长”这里。

这样的队列操作不太符合我们的习惯,所以使用上会不那么顺手。

高效的秘诀

在不顺手的情况下,为什么还是有很多系统用到它呢?原因在于它非常高效。

技术图片

上面是网上找到的性能对比图。可以看到Disruptor性能上是非常高的。
那它是如何实现高效的呢?

  1. 内存分配更加合理,使用 RingBuffer 数据结构,数组元素在初始化时一次性全部创建,提升缓存命中率;
  2. 对象循环利用,避免频繁 GC。
  3. 能够避免伪共享,提升缓存利用率。
  4. 采用无锁算法,避免频繁加锁、解锁的性能消耗。支持批量消费,消费者可以无锁方式消费多个消息。

对于第四点,相信大家都很清楚。锁操作涉及到操作系统状态切换,这个操作是非常耗时耗资源的。无锁操作可以避免状态切换。

对于前面三点,涉及到一个非常重要的概念,就是缓存。CPU有三级缓存。离CPU越近的缓存,速度越快,但是容量越小。因为CPU的速度远远大于其他硬件的速度,设置缓存能够减小CPU和其他硬件的速度差。这个缓存和生产者消费者中间的队列有异曲同工之妙。

为了提高缓存的命中率,硬件通过局部性原理,在加载一个数据的同时将它周围的数据也加载进去。

程序的局部性原理指的是在一段时间内程序的执行会限定在一个局部范围内。这里的“局部性”可以从两个方面来理解,一个是时间局部性,另一个是空间局部性。时间局部性指的是程序中的某条指令一旦被执行,不久之后这条指令很可能再次被执行;如果某条数据被访问,不久之后这条数据很可能再次被访问。而空间局部性是指某块内存一旦被访问,不久之后这块内存附近的内存也很可能被访问。

上诉的第1、2条,通过将数据设置进连续相邻的内存位置,CPU在读取了一个数据的时候,发现第二个数据已经因为“局部性”原理加载进缓存,就不需要再次去寻址,直接从缓存中获取数据。

第1,2条是对缓存的高效利用,第3条就是对缓存低效使用的规避。
有一种缓存低效使用的方式是“伪共享”。内存是按照缓存行进行管理的。缓存行的大小通常是64个字节。

技术图片

例如一个缓存行存储了两个对象,对其中一个对象的操作会使得整个缓存行失效。也就是说即使对象B被加入了缓存,但是因为其他对象的操作无效了。
第3条中,Disruptor 中通过将对象包裹,让一个对象充满整个缓存行,避免了伪共享的问题。

还有一点就是,相对于其他阻塞队列,Disruptor 的等待策略更多,功能更加强大。

通过对缓存的利用和无锁操作,Disruptor 成为一个高效队列。

一些思考

Disruptor 的一些思想其实在其他框架上也是常见的。

避免伪共享问题上,mysql 8.0 版本直接将查询缓存的整块功能删掉了;在高效利用缓存上,线程池、队列等都多算缓存概念的受益者;避免锁操作上,Java的底层的各种锁优化,也是利用这点,比如轻量级锁。

为什么这么多框架会不约而同地想到这些问题呢?

因为计算机、操作系统是非常成熟的,底层都是非常相似的架构。了解计算机底层原理,对这些知识才能触类旁通。所以,啥不说,计算机基础课,我打算再上一遍。





以上是关于Disruptor 使用简介的主要内容,如果未能解决你的问题,请参考以下文章

disruptor框架

Disruptor并发框架

StormLog4j2高性能之—Disruptor队列

高性能队列Disruptor使用入门,原理和代码实现

LMAX Disruptor 最简单实际的示例代码

如何优雅地使用Disruptor