Exchanger详解
Posted truestoriesavici01
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Exchanger详解相关的知识,希望对你有一定的参考价值。
Exchanger详解
简介
当一个线程到达栅栏时,会检查是否有其他线程已经到达栅栏.
若没有,则该线程进入等待.
若有,则与等待的其他线程交换各自的数据,然后继续执行.
原理
- 内部类
Participant
继承自ThreadLocal
,用来保存线程本地变量Node
. Node
存储用于单槽交换和多槽交换的字段.
单槽位交换(slot exchange)
流程:
- 首先到达的线程:
- 将
slot
字段指向自身的Node
节点,表示槽位已被占用. - 该线程自旋一段时间.若经过一段时间自旋还是没有配对的线程到达,则进入阻塞.(自旋减少上下文切换的开销)
- 将
- 后续到达的线程:
- 此时槽位
slot
已被占用.则后续的线程将槽位slot
清空,取出Node
中的item
作为交换的数据. - 后续的线程把自身的数据存入
Node
中的match
字段中,并唤醒先到达的线程.
- 此时槽位
- 先到达的线程被唤醒:
- 检查
match
是否为空.不为空则退出自旋,将match
中的数据返回.
- 检查
多槽位交换(arena exchange)
触发机制:
在单槽位交换中,若:多个匹配线程竞争修改slot
槽位,导致线程CAS修改slot
失败,则初始化arena
多槽位数组,后续的交换使用多槽位交换.
流程:
- 若槽不为空,则已有线程到达并等待.
- 获取已到达先携带的数据.
- 将当前线程携带的数据交换给已到达的线程.
- 唤醒已到达的线程.
- 若槽位有效且为空.
- CAS占用槽位成功.
- 通过
spin->yield->block
的锁升级方式进行优化的等待其他线程到达.若有线程到达,则交换数据后返回交换后的数据. - 若没有等待配对的线程,则阻塞的线程.
- 无效的槽位,需要扩容.
- 通过CAS方式对数组进行扩容.
注:
数组是连续的内存地址空间.多个slot会被加载到同一个缓存行上.
当一个slot
改变时,导致该slot
所在的缓存行上所有的数据都无效,需要重新从内存加载.
不同版本的差异
- JDK5被设计为容量为1的容器,存放一个等待的线程.当另外一个线程到达时,交换数据后会清空容器.
- JDK6后提供多个
slot
,增加并发执行的吞吐量.
示例
生产者-消费者模式
示例:
public class ProducerAndConsumer {
@Test
public void test() throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<>();
new Thread(new Producer(exchanger)).start();
new Thread(new Consumer(exchanger)).start();
TimeUnit.SECONDS.sleep(20);
}
}
class Producer implements Runnable {
private int[] array = new int[5];
private final Exchanger exchanger;
public Producer(Exchanger exchanger){
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
TimeUnit.SECONDS.sleep(2);
array[i] = i;
System.out.println("生产者生产的数据为: " + array[i]);
int exchange = (int) exchanger.exchange(array[i]);
System.out.println("生产者交换后的数据为: "+exchange);
System.out.println("生产者的对应的数据是否改变: " + array[i]);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private int[] array = new int[5];
private final Exchanger exchanger;
public Consumer(Exchanger exchanger){
this.exchanger = exchanger;
}
@Override
public void run() {
int index = 0;
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("消费者交换前的数据为: " + array[index]);
int exchange = (int) exchanger.exchange(array[index]);
index++;
System.out.println("消费者获得的数据为: " + exchange);
System.out.println("消费者交换后的数据: " + array[index]);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
注:
- 交换数据后并不改变线程内部的数据.交换数据在于获取对方的数据.是否替换用来交换局部数据需要自行设定.
参考:
以上是关于Exchanger详解的主要内容,如果未能解决你的问题,请参考以下文章