队列之王: Disruptor 原理架构源码 一文穿透

Posted 40岁资深老架构师尼恩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了队列之王: Disruptor 原理架构源码 一文穿透相关的知识,希望对你有一定的参考价值。

文章很长,而且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 博客园版 为您奉上珍贵的学习资源 :

免费赠送 :《尼恩Java面试宝典》 持续更新+ 史上最全 + 面试必备 2000页+ 面试必备 + 大厂必备 +涨薪必备
免费赠送 经典图书:《Java高并发核心编程(卷1)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷2)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷3)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:尼恩Java面试宝典 最新版 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 资源宝库: Java 必备 百度网盘资源大合集 价值>10000元 加尼恩领取


disruptor 红宝书目的:

作为Java领域最高性能的 队列,没有之一, 大家不光要懂,而是 需要深入骨髓的搞懂

所以,给大家奉上了本书,并且配备了视频进行 详细介绍, 目的:

帮助,大家穿透 一个 绝对核心高性能 Java高性能的 队列 的架构和原理,让 面试题 五体投地、顶礼膜拜

文章目录

队列之王 Disruptor 简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。

2011年,企业应用软件专家Martin Fowler专门撰写长文介绍Disruptor。

2011年,Disruptor还获得了Oracle官方的Duke大奖

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

要深入了解 disruptor ,咱们从 Java的 内置队列开始介绍起。

Java内置队列的问题

介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。

Java的内置队列如下表所示。

队列有界性数据结构
ArrayBlockingQueuebounded加锁arraylist
LinkedBlockingQueueoptionally-bounded加锁linkedlist
ConcurrentLinkedQueueunbounded无锁linkedlist
LinkedTransferQueueunbounded无锁linkedlist
PriorityBlockingQueueunbounded加锁heap
DelayQueueunbounded加锁heap

队列的底层一般分成三种:数组、链表和堆。

其中,堆一般情况下是为了实现带有优先级特性的队列,

暂时不做介绍,后面讲netty 定时任务的时候,再介绍。

从数组和链表两种数据结构来看,两类结构如下:

  • 基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;
  • 基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者通过原子变量compare and swap(以下简称“CAS”)这种无锁方式来实现的。

和ConcurrentLinkedQueue一样,上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的

但是,对 volatile类型的变量进行 CAS 操作,存在伪共享问题,

Disruptor 的使用场景

Disruptor 它可以用来作为高性能的有界内存队列, 适用于两大场景:

  • 生产者消费者场景
  • 发布订阅 场景

生产者消费者场景。Disruptor的最常用的场景就是“生产者-消费者”场景,对场景的就是“一个生产者、多个消费者”的场景,并且要求顺序处理。

备注,这里和JCTool 的 MPSC 队列,刚好相反, MPSC 使用于多生产者,单消费者场景

发布订阅 场景:Disruptor也可以认为是观察者模式的一种实现, 实现发布订阅模式。

当前业界开源组件使用Disruptor的包括Log4j2、Apache Storm等,

1 前置知识:伪共享原理与实操

在介绍 无锁框架 disruptor 之前,作为前置的知识,首先给大家介绍 伪共享 原理&性能对比实战 。

CPU的结构

下图是计算的基本结构。

L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。

  • L1缓存很小但很快,并且紧靠着在使用它的CPU内核;
  • L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;
  • L3更大、更慢,并且被单个插槽上的所有CPU核共享;
  • 最后是主存,由全部插槽上的所有CPU核共享。

级别越小的缓存,越接近CPU, 意味着速度越快且容量越少。

L1是最接近CPU的,它容量最小**(比如256个字节)**,速度最快,

每个核上都有一个L1 Cache(准确地说每个核上有两个L1 Cache, 一个存数据 L1d Cache, 一个存指令 L1i Cache);

L2 Cache 更大一些**(比如256K个字节)**,速度要慢一些,一般情况下每个核上都有一个独立的L2 Cache;

二级缓存就是一级缓存的存储器:

一级缓存制造成本很高因此它的容量有限,二级缓存的作用就是存储那些CPU处理时需要用到、一级缓存又无法存储的数据。

L3 Cache是三级缓存中最大的一级,例如**(比如12MB个字节)**,同时也是最慢的一级,在同一个CPU插槽之间的核共享一个L3 Cache。

三级缓存和内存可以看作是二级缓存的存储器,它们的容量递增,但单位制造成本却递减

L3 Cache和L1,L2 Cache有着本质的区别。

L1和L2 Cache都是每个CPU core独立拥有一个,而L3 Cache是几个Cores共享的,可以认为是一个更小但是更快的内存。

缓存行 cache line

为了提高IO效率,CPU每次从内存读取数据,并不是只读取我们需要计算的数据,而是一批一批去读取的,这一批数据,也叫Cache Line(缓存行)。

也可以理解为批量读取,提升性能。 为啥要一批、一批的读取呢? 这也满足 空间的局部性原理(具体请参见葵花宝典)。

从读取的角度来说,缓存,是由缓存行Cache Line组成的。

所以使用缓存时,并不是一个一个字节使用,而是一行缓存行、一行缓存行这样使用;

换句话说,CPU存取缓存都是按照一行,为最小单位操作的。并不是按照字节为单位,进行操作的。

一般而言,读取一行数据时,是将我们需要的数据周围的连续数据一次性全部读取到缓存中。这段连续的数据就称为一个缓存行

一般一行缓存行有64字节。intel处理器的缓存行是64字节。目前主流的CPU Cache的Cache Line大小都是64Bytes。

假设我们有一个512 Bytes 的一级缓存,那么按照64 Bytes 的缓存单位大小来算,这个一级缓存所能存放的缓存个数就是512/64 = 8个。

所以,Cache Line可以简单的理解为CPU Cache中的最小缓存单位。

这些CPU Cache的写回和加载,都不是以一个变量作为单位。这些都是以整个Cache Line作为单位。

如果一个常量和变量放在一行,那么变量的更新,也会影响常量的使用:

CPU在加载数据时,整个缓存行过期了,加载常量的时候,自然也会把这个数据从内存加载到高速缓存。

什么是 伪共享(False Sharing)问题?

提前说明: 翻译 有瑕疵, 伪共享(False Sharing), 应该翻译为 “错共享”, 才更准确

CPU的缓存系统是以缓存行(cache line)为单位存储的,一般的大小为64bytes。

在多线程程序的执行过程中,存在着一种情况,多个需要频繁修改的变量存在同一个缓存行当中。

假设:有两个线程分别访问并修改X和Y这两个变量,X和Y恰好在同一个缓存行上,这两个线程分别在不同的CPU上执行。

那么每个CPU分别更新好X和Y时将缓存行刷入内存时,发现有别的修改了各自缓存行内的数据,这时缓存行会失效,从L3中重新获取。

这样的话,程序执行效率明显下降。

为了减少这种情况的发生,其实就是避免X和Y在同一个缓存行中,

如何操作呢?可以主动添加一些无关变量将缓存行填充满,

比如在X对象中添加一些变量,让它有64 Byte那么大,正好占满一个缓存行。

两个线程(Thread1 和 Thread2)同时修改一个同一个缓存行上的数据 X Y:

如果线程1打算更改a的值,而线程2准备更改b的值:

Thread1:x=3;

Thread2:y=2;

由x值被更新了,所以x值需要在线程1和线程2之间传递(从线程1到线程2),

x、y的变更,都会引起 cache line 整块 64 bytes 被刷新,因为cpu核之间以cache line的形式交换数据(cache lines的大小一般为64bytes)。

在并发执行的场景下,每个线程在不同的核中被处理。

假设 x,y是两个频繁修改的变量,x,y,还位于同一个缓存行.

如果,CPU1修改了变量x时,L3中的缓存行数据就失效了,也就是CPU2中的缓存行数据也失效了,CPU2需要的y需要重新从内存加载。

如果,CPU2修改了变量y时,L3中的缓存行数据就失效了,也就是CPU1中的缓存行数据也失效了,CPU1需要的x需要重新从内存加载。

x,y在两个cpu上进行修改,本来应该是互不影响的,但是由于缓存行在一起,导致了相互受到了影响。

伪共享问题(False Sharing)的本质

出现伪共享问题(False Sharing)的原因:

  • 一个缓存行可以存储多个变量(存满当前缓存行的字节数);64个字节可以放8个long,16个int
  • 而CPU对缓存的修改又是以缓存行为最小单位的; 不是以long 、byte这样的数据类型为单位的
  • 在多线程情况下,如果需要修改“共享同一个缓存行的其中一个变量”,该行中其他变量的状态 就会失效,甚至进行一致性保护

所以,伪共享问题(False Sharing)的本质是:

对缓存行中的单个变量进行修改了,导致整个缓存行其他不相关的数据也就失效了,需要从主存重新加载

如果 其中有 volatile 修饰的变量,需要保证线程可见性的变量,还需要进入 缓存与数据一致性的保障流程, 如mesi协议的数据一致性保障 用了其他变量的 Core的缓存一致性。

缓存一致性是根据缓存行为单元来进行同步的,假如 y是 volatile 类型的,假如a修改了x,而其他的线程用到y,虽然用到的不是同一个数据,但是他们(数据X和数据Y)在同一个缓存行中,其他的线程的缓存需要保障数据一致性而进行数据同步,当然,同步也需要时间。

一个CPU核心在加载一个缓存行时要执行上百条指令。如果一个核心要等待另外一个核心来重新加载缓存行,那么他就必须等在那里,称之为stall(停止运转)。

伪共享问题 的解决方案

减少伪共享也就意味着减少了stall的发生,其中一个手段就是通过填充(Padding)数据的形式,来保证本应有可能位于同一个缓存行的两个变量,在被多线程访问时必定位于不同的缓存行。

简单的说,就是 以空间换时间: 使用占位字节,将变量的所在的 缓冲行 塞满。

disruptor 无锁框架就是这么干的。

一个缓冲行填充的例子

下面是一个填充了的缓存行的,尝试 p1, p2, p3, p4, p5, p6为AtomicLong的value的缓存行占位,将AtomicLong的value变量的所在的 缓冲行 塞满,

代码如下:

package com.crazymakercircle.demo.cas;

import java.util.concurrent.atomic.AtomicLong;

public class PaddedAtomicLong extends AtomicLong 
    private static final long serialVersionUID = -3415778863941386253L;

    /**
     * Padded 6 long (48 bytes)
     */
    public volatile long p1, p2, p3, p4, p5, p6 = 7L;

    /**
     * Constructors from @link AtomicLong
     */
    public PaddedAtomicLong() 
        super();
    

    public PaddedAtomicLong(long initialValue) 
        super(initialValue);
    

    /**
     * To prevent GC optimizations for cleaning unused padded references
     */
    public long sumPaddingToPreventOptimization() 
        return p1 + p2 + p3 + p4 + p5 + p6;
    


例子的部分结果如下:

printable = com.crazymakercircle.basic.demo.cas.busi.PaddedAtomicLong object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)                           01 00 00 00 (00000001 00000000 00000000 00000000) (1)
      4     4        (object header)                           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)                           50 08 01 f8 (01010000 00001000 00000001 11111000) (-134150064)
     12     4        (alignment/padding gap)                  
     16     8   long AtomicLong.value                          0
     24     8   long PaddedAtomicLong.p1                       0
     32     8   long PaddedAtomicLong.p2                       0
     40     8   long PaddedAtomicLong.p3                       0
     48     8   long PaddedAtomicLong.p4                       0
     56     8   long PaddedAtomicLong.p5                       0
     64     8   long PaddedAtomicLong.p6                       7

Instance size: 72 bytes
Space losses: 4 bytes internal + 0 bytes external = 4 bytes total

伪共享False Sharing在java 8中解决方案

JAVA 8中添加了一个@Contended的注解,添加这个的注解,将会在自动进行缓存行填充。

下面有一个@Contended的例子:

package com.crazymakercircle.basic.demo.cas.busi;
import sun.misc.Contended;
public class ContendedDemo

    //有填充的演示成员
    @Contended
    public volatile long padVar;

    //没有填充的演示成员
    public volatile long notPadVar;



以上代码使得padVar和notPadVar都在不同的cache line中。@Contended 使得notPadVar字段远离了对象头部分。

printable = com.crazymakercircle.basic.demo.cas.busi.ContendedDemo object internals:
 OFFSET  SIZE   TYPE DESCRIPTION               VALUE
      0     4        (object header)           01 00 00 00 (00000001 00000000 00000000 00000000) (1)
      4     4        (object header)           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)           50 08 01 f8 (01010000 00001000 00000001 11111000) (-134150064)
     12     4        (alignment/padding gap)  
     16     8   long ContendedDemo.padVar      0
     24     8   long ContendedDemo.notPadVar   0
Instance size: 32 bytes
Space losses: 4 bytes internal + 0 bytes external = 4 bytes total

执行时,必须加上虚拟机参数-XX:-RestrictContended,@Contended注释才会生效。

很多文章把这个漏掉了,那样的话实际上就没有起作用。

新的结果;

printable = com.crazymakercircle.basic.demo.cas.busi.ContendedDemo object internals:
 OFFSET  SIZE   TYPE DESCRIPTION               VALUE
      0     4        (object header)           01 00 00 00 (00000001 00000000 00000000 00000000) (1)
      4     4        (object header)           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)           50 08 01 f8 (01010000 00001000 00000001 11111000) (-134150064)
     12     4        (alignment/padding gap)  
     16     8   long ContendedDemo.notPadVar   0
     24   128        (alignment/padding gap)  
    152     8   long ContendedDemo.padVar      0
    160   128        (loss due to the next object alignment)
Instance size: 288 bytes
Space losses: 132 bytes internal + 128 bytes external = 260 bytes total

在Java 8中,使用@Contended注解的对象或字段的前后各增加128字节大小的padding,使用2倍于大多数硬件缓存行的大小来避免相邻扇区预取导致的伪共享冲突。我们目前的缓存行大小一般为64Byte,这里Contended注解为我们前后加上了128字节绰绰有余。

注意:如果想要@Contended注解起作用,需要在启动时添加JVM参数-XX:-RestrictContended 参数后 @sun.misc.Contended 注解才有。

可见至少在JDK1.8以上环境下, 只有@Contended注解才能解决伪共享问题, 但是消耗也很大, 占用了宝贵的缓存, 用的时候要谨慎。

另外:

@Contended 注释还可以添加在类上,每一个成员,都会加上。

伪共享性能比对实操:结论,差6倍

三个实操:

  • 首先存在伪共享场景下的 耗时计算
  • 其次是消除伪共享场景下的 耗时计算
  • 再次是使用unsafe访问变量时的耗时计算

存在伪共享场景下的 耗时计算

entity类

并行的执行数据修改,这里抽取成为了一个通用的方法

测试用例

执行的时间

消除伪共享场景下的 耗时计算

entity类

测试用例

消除伪共享场景下的 耗时计算 (550ms)

使用unsafe访问变量的耗时计算

entity

使用unsafe访问变量的耗时计算:

54ms

性能总结

消除伪共享场景 ,比存在伪共享场景 的性能 , 性能提升 6倍左右

实验数据,从 3000ms 提升 到 500ms

使用 unsafe 取消内存可见性,比消除伪共享场景 ,性能提升 10 倍左右

实验数据,从 500ms 提升 到 50ms

通过实验的对比, 可见Java 的性能,是可以大大优化的,尤其在高性能组件

以上实操的 详细介绍 ,请参见 《100wqps 日志平台实操》

JDK源码中如何解决 伪共享问题

在LongAdder在java8中的实现已经采用了@Contended。

LongAdder以及 Striped64如何解决伪共享问题

LongAdder是大家常用的 高并发累加器

通过分而治之的思想,实现 超高并发累加。

LongAdder的 结构如下:

Striped64是在java8中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数,

Striped64的设计思路是在竞争激烈的时候尽量分散竞争,

Striped64维护了一个base Count和一个Cell数组,计数线程会首先试图更新base变量,如果成功则退出计数,否则会认为当前竞争是很激烈的,那么就会通过Cell数组来分散计数,

Striped64根据线程来计算哈希,然后将不同的线程分散到不同的Cell数组的index上,然后这个线程的计数内容就会保存在该Cell的位置上面,

基于这种设计,最后的总计数需要结合base以及散落在Cell数组中的计数内容。

这种设计思路类似于java7的ConcurrentHashMap实现,也就是所谓的分段锁算法,ConcurrentHashMap会将记录根据key的hashCode来分散到不同的segment上,

线程想要操作某个记录,只需要锁住这个记录对应着的segment就可以了,而其他segment并不会被锁住,其他线程任然可以去操作其他的segment,

这样就显著提高了并发度,

虽然如此,java8中的ConcurrentHashMap实现已经抛弃了java7中分段锁的设计,而采用更为轻量级的CAS来协调并发,效率更佳。

Cell元素如何消除伪共享

Striped64 中的Cell元素,是如何消除伪共享的呢?

可以打印一下 cell的 内存结构

当然,别忘记加上 vm 选项:-XX:-RestrictContended

对于伪共享,我们在实际开发中该怎么做?

通过上面大篇幅的介绍,我们已经知道伪共享的对程序的影响。

那么,在实际的生产开发过程中,我们一定要通过缓存行填充去解决掉潜在的伪共享问题吗?

其实并不一定。

首先就是多次强调的,伪共享是很隐蔽的,我们暂时无法从系统层面上通过工具来探测伪共享事件。

其次,不同类型的计算机具有不同的微架构(如 32 位系统和 64 位系统的 java 对象所占自己数就不一样),如果设计到跨平台的设计,那就更难以把握了,一个确切的填充方案只适用于一个特定的操作系统。

还有,缓存的资源是有限的,如果填充会浪费珍贵的 cache 资源,并不适合大范围应用。

3 Disruptor框架是如何解决伪共享问题的?

在Disruptor中有一个重要的类Sequence,该类包装了一个volatile修饰的long类型数据value,

Sequence的结构和源码

无论是Disruptor中的基于数组实现的缓冲区RingBuffer,还是生产者,消费者,都有各自独立的Sequence,

Sequence的用途是啥呢?

  • 在RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。
  • 对于生产者和消费者来说,Sequence标示着它们的事件序号。

Sequence的结构图如下

来看看Sequence类的源码:

  class LhsPadding 
	protected long p1, p2, p3, p4, p5, p6, p7;


class Value extends LhsPadding 
	protected volatile long value;


class RhsPadding extends Value 
	protected long p9, p10, p11, p12, p13, p14, p15;


public class Sequence extends RhsPadding 
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static 
		UNSAFE = Util.getUnsafe();
		try 
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		 catch(final Exception e) 
			 throw new RuntimeException(e);
		
	
	


public Sequence() 
	this(INITIAL_VALUE);


public Sequence(final long initialValue) 
	UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);



2:Disruptor 的 使用实战

我们从一个简单的例子开始学习Disruptor:

生产者传递一个long类型的值给消费者,而消费者消费这个数据的方式仅仅是把它打印出来。

定义一个Event和工厂

首先定义一个Event来包含需要传递的数据:

public class LongEvent  
    private long value;
    public long getValue()  
        return value; 
     
 
    public void setValue(long value)  
        this.value = value; 
     
 

由于需要让Disruptor为我们创建事件,我们同时还声明了一个EventFactory来创建Event对象。

public class LongEventFactory implements EventFactory  
    @Override 
    public Object newInstance()  
        return new LongEvent(); 
     
 

定义事件处理器(消费者)

我们还需要一个事件消费者,也就是一个事件处理器。

这个例子中,事件处理器的工作,就是简单地把事件中存储的数据打印到终端:

    /** 
     * 类似于消费者
     *  disruptor会回调此处理器的方法
     */
    static class LongEventHandler implements EventHandler<LongEvent> 
        @Override
        public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception 
            System.out.println(longEvent.getValue());
        
    

disruptor会回调此处理器的方法

定义事件源(生产者)

事件都会有一个生成事件的源,类似于 生产者的角色,

如何产生事件,然后发出事件呢?

通过从 环形队列中 获取 序号, 通过序号获取 对应的 事件对象, 将数据填充到 事件对象,再通过 序号将 事件对象 发布出去。

一段生产者的代码如下:

    //  事件生产者:业务代码
    // 通过从 环形队列中 获取 序号, 通过序号获取 对应的 事件对象, 将数据填充到 事件对象,再通过 序号将 事件对象 发布出去。
    static class LongEventProducer 
        private final RingBuffer以上是关于队列之王: Disruptor 原理架构源码 一文穿透的主要内容,如果未能解决你的问题,请参考以下文章

Disruptor源码分析

高性能队列Disruptor使用入门,原理和代码实现

高性能并发队列Disruptor

高性能并发队列Disruptor

单机最快的队列Disruptor解析和使用

Disruptor极速队列