Exchanger详解

Posted truestoriesavici01

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Exchanger详解相关的知识,希望对你有一定的参考价值。

Exchanger详解

简介

当一个线程到达栅栏时,会检查是否有其他线程已经到达栅栏.
若没有,则该线程进入等待.
若有,则与等待的其他线程交换各自的数据,然后继续执行.

原理

  • 内部类Participant继承自ThreadLocal,用来保存线程本地变量Node.
  • Node存储用于单槽交换多槽交换的字段.

单槽位交换(slot exchange)

技术图片

流程:

  • 首先到达的线程:
    • slot字段指向自身的Node节点,表示槽位已被占用.
    • 该线程自旋一段时间.若经过一段时间自旋还是没有配对的线程到达,则进入阻塞.(自旋减少上下文切换的开销)
  • 后续到达的线程:
    • 此时槽位slot已被占用.则后续的线程将槽位slot清空,取出Node中的item作为交换的数据.
    • 后续的线程把自身的数据存入Node中的match字段中,并唤醒先到达的线程.
  • 先到达的线程被唤醒:
    • 检查match是否为空.不为空则退出自旋,将match中的数据返回.

多槽位交换(arena exchange)

触发机制:
在单槽位交换中,若:多个匹配线程竞争修改slot槽位,导致线程CAS修改slot失败,则初始化arena多槽位数组,后续的交换使用多槽位交换.

流程:

  1. 若槽不为空,则已有线程到达并等待.
    • 获取已到达先携带的数据.
    • 将当前线程携带的数据交换给已到达的线程.
    • 唤醒已到达的线程.
  2. 若槽位有效且为空.
    • CAS占用槽位成功.
    • 通过spin->yield->block的锁升级方式进行优化的等待其他线程到达.若有线程到达,则交换数据后返回交换后的数据.
    • 若没有等待配对的线程,则阻塞的线程.
  3. 无效的槽位,需要扩容.
    • 通过CAS方式对数组进行扩容.

注:
数组是连续的内存地址空间.多个slot会被加载到同一个缓存行上.
当一个slot改变时,导致该slot所在的缓存行上所有的数据都无效,需要重新从内存加载.

不同版本的差异

  • JDK5被设计为容量为1的容器,存放一个等待的线程.当另外一个线程到达时,交换数据后会清空容器.
  • JDK6后提供多个slot,增加并发执行的吞吐量.

示例

生产者-消费者模式

示例:

public class ProducerAndConsumer {
    @Test
    public void test() throws InterruptedException {
        Exchanger<Integer> exchanger = new Exchanger<>();
        new Thread(new Producer(exchanger)).start();
        new Thread(new Consumer(exchanger)).start();
        TimeUnit.SECONDS.sleep(20);
    }
}

class Producer implements Runnable {
    private int[] array = new int[5];
    private final Exchanger exchanger;

    public Producer(Exchanger exchanger){
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            try {
                TimeUnit.SECONDS.sleep(2);
                array[i] = i;
                System.out.println("生产者生产的数据为: " + array[i]);
                int exchange = (int) exchanger.exchange(array[i]);
                System.out.println("生产者交换后的数据为: "+exchange);
                System.out.println("生产者的对应的数据是否改变: " + array[i]);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private int[] array = new int[5];
    private final Exchanger exchanger;

    public Consumer(Exchanger exchanger){
        this.exchanger = exchanger;
    }
    @Override
    public void run() {
        int index = 0;
        while (true) {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("消费者交换前的数据为: " + array[index]);
                int exchange = (int) exchanger.exchange(array[index]);
                index++;
                System.out.println("消费者获得的数据为: " + exchange);
                System.out.println("消费者交换后的数据: " + array[index]);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

注:

  • 交换数据后并不改变线程内部的数据.交换数据在于获取对方的数据.是否替换用来交换局部数据需要自行设定.

参考:






以上是关于Exchanger详解的主要内容,如果未能解决你的问题,请参考以下文章

一行Java代码实现两玩家交换装备并发编程

一行Java代码实现两玩家交换装备并发编程

java 多线程 29 :多线程组件之 Exchanger

学习了解 Exchanger - 实现生产者消费者模型

Exchanger兄弟线程间数据信息交换

一行Java代码实现游戏中交换装备