优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析
Posted 李浩宇Alex
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析相关的知识,希望对你有一定的参考价值。
Disruptor原理
首先Disruptor是为了解决高并发缓存的队列,为线程间通讯提供高效的性能,它是如何做到无阻塞、多生产、多消费的?
上图简单的画了一下构建Disruptor的各个参数以及 ringBuffer 的构造,下面简单的说一下。
生产者需要组件
- Event模型:从生产者传递给消费者的数据单位,完全由用户定义其类型。
@Data
public class SampleEvent {
private Long id。
private String sampleDataStr。
}
- EventFactory:创建事件(任务)的工厂类。(这里任务会创建好,保存在内存中,可以看做是一个空任务)。
public class SampleEventFactory implements EventFactory<SampleEvent> {
@Override
public SampleEvent newInstance() {
// 实例化数据(建好空数据,等待后面初始化)
return new SampleEvent()。
}
}
-
RingBuffer:环形缓冲区通常被认为是Disruptor的主要实现,当前版本即3.0版本之后,RingBuffer仅负责存储和更新通过Disruptor的数据(Event)。
- ringBufferSize:容器的长度。( Disruptor 的核心容器是 ringBuffer,环转数组,有限长度)。
-
ProductType:生产者类型:单生产者、多生产者。
- Sequencer:Sequencer是Disruptor的核心API。该接口的2个实现类(SingleProducer,MultiProducer)实现了所有并发算法,用于在生产者和消费者之间快速,正确地传递数据。
-
WaitStrategy:等待策略。(当队列里的数据都被消费完之后,消费者和生产者之间的等待策略),等待策略确定消费者如何等待生产者将事件放入Disruptor。
- RingBuffer:存放数据的容器。
@Data
@AllArgsConstructor
public class SampleEventProducer {
private RingBuffer<OrderEvent> ringBuffer。
public void sendData(long id) {
//获取下一个可用序号
long sequence = ringBuffer.next()。
try {
//获取一个空对象(没有填充值)
SampleEvent sampleEent = ringBuffer.get(sequence)。
}finally {
//提交
ringBuffer.publish(sequence)。
}
}
}
消费者需要组件
-
Executor:消费者线程池,执行任务的线程。(每一个消费者都需要从线程池里获得线程去消费任务)。
-
EventProcessor:用于处理来自Disruptor的事件的主事件循环,并具有消费者序列的所有权。有一个名为 BatchEventProcessor表示,它包含事件循环的有效实现,并将回调到使用的提供的EventHandler接口实现。
-
EventHandler:事件处理器,由用户实现并代表Disruptor的使用者的接口,用户客户端实现消息的处理机制,由客户端具体实现。
public class SampleEventHandler implements EventHandler<SampleEvent> { /** * 事件驱动监听--消费者消费的主体 */ @Override public void onEvent(SampleEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println(event.getSampleDataStr() + " " +Thread.currentThread().getName())。 } }
算法核心Sequence序号
-
Sequence:Disruptor使用Sequences作为识别特定组件所在位置的方法。
-
每个消费者(EventProcessor)都像Disruptor本身一样维护一个Sequence。大多数并发代码依赖于这些Sequence值的变化或者叫移动,因此Sequence支持AtomicLong的许多当前功能。
- 事实上,唯一真正的区别是Sequence包含额外的功能,以防止序列和其他值之间的错误共享。
-
- Sequence Barrier:序列屏障由Sequencer产生,包含对Sequencer中主要发布的sequence和任何依赖性消费者的序列的引用。它包含确定是否有任何可供消费者处理的事件的逻辑。
Disruptor的优点:
-
多线程之间没有竞争即没有锁。
-
所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构。
- 每个对象中都能跟踪序列号(ring buffer, claim strategy,生产者和消费者),加上神奇的缓存行填充,就意味着没有伪共享和非预期的竞争。
下面再简单介绍下RingBuffer核心实现,来看看队列的实现细节。
其为环形队列,有点像一致性Hash算法中的闭环,但完全不一样。
底层的话是一个固定大小的数组结构,相比于队列来说,其只有一个下标指针cursor,如果槽的个数是2的N次方更有利于基于二进制的计算机进行计算。如果看过HashMap源码应该知道,HashMap定位元素槽时使用了一种巧妙的方式,hash&(length-1)。
RingBuffer同样是相同的计算方式,sequence&(length-1),当然你可以进行取模操作。
- 取模操作在寄存器中的计算,需要多次的迭代加操作进行的,所以相对于计算速度来说,对于计算机进行位运算效率绝对是高于取模操作的,尤其是对于高并发状况下的计算,能够节省很多单位cpu开销。
一般实现线性存储有两种实现方式:
- 一种是基于连续内存分配的HashTable
- 一种是基于随机内存分配的迭代指针。
为什么RingBuffer选用数组作为存储结构,而不选用链表存储?
缓存或者程序的局部性原理
-
(Good)数组内存属是连续分配内存的预读策略,也就是内存加载时,会将部分连续内存地址预先加载到高速缓存中,即认为你可能会使用,上面我们分析了操作系统中的cpu操作数据的流程,可以看出这种设计是为了不用反复从内存中加载。
- (Bad)链表的内存分配是碎片化的所以其存储地址不是连续的,导致每次都会cpu都会重新计算下一个链表位置的地址,并从内存中加载相关的数据,数据量小的情况下并不能看出性能的优劣,但是当数据量大的情况下,这种极小的消耗,会对整体的运行效率产生影响。
伪共享
内存以高速缓存行的形式存储在高速缓存系统中。高速缓存行是2的N次方个连续字节,其大小通常为32-256,最常见的缓存行大小为64字节。
-
首先我们知道对于锁来说是关中断实现,锁定bus消息总线实现,而对于共享内存,计算机使用的是缓存行,共享变量的多个线程,共享相同的缓存行。
- 实现线程数量的线性可伸缩性,我们必须确保没有两个线程写入同一个变量或缓存行。而当使用volatile的时候,我们读取直接共享变量从主内存或者叫共享内存中读取变量的值,其本质是使计算机缓存行失效。
在CPU核心A运行的线程想要更新变量X,而CPU核心B上的线程想要更新变量Y。
这两个热变量位于同一缓存行中。每个线程都将竞争缓存行的所有权,以便他们可以更新它。如果核心A获得所有权,那么MESI/MOSI缓存子系统将需要使核心B的相应缓存行无效。反之也是一样,极大地影响性能。如果竞争核心在不同的套接字上并且还必须跨越套接字互连,则缓存行问题将进一步加剧。
- 特别是不同的线程操作同一个缓存行,需要发出RFO(Request for Owner)信号锁定缓存行,保证写操作的原子性,此时其他线程不能操作这个缓存行,这将对效率有极大的影响。
缓存行填充的概念
下面是缓存行实现,另外缓存行填充有一个前提同时分配的对象往往位于同一位置。
public long p1, p2, p3, p4, p5, p6, p7; // cache line padding
private volatile long cursor = INITIAL_CURSOR_VALUE;
public long p8, p9, p10, p11, p12, p13, p14; // cache line padding
如果有不同的消费者往不同的字段写入,你需要确保各个字段间不会出现伪共享。
/**
* 数组保存了VolatileLongPadding,其中数组中一个long类型保存数组长度,算上
* 自身long类型value,需要再填充6个long类型,就能将数组中的对象填充满一个缓存行。
* 注意:这里使用继承的方式实现缓存行对齐,因为Java编译器会优化无效的字段。
*/
class CacheLinePadding {
// 如果不需要填充,只需要注释掉这段代码即可
public volatile long p1, p2, p3, p4, p5, p6;
}
class CacheLinePaddingObject extends CacheLinePadding {
//实际操作的值
public volatile long value = 0L;
}
以上是关于优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析的主要内容,如果未能解决你的问题,请参考以下文章
优化技术专题「线程间的高性能消息框架」终极关注Disruptor的核心源码和Java8的@Contended伪共享指南
分布式技术专题线程间的高性能消息框架-深入浅出Disruptor的使用和原理