并发编程从零开始-同步工具类

Posted 会编程的老六

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程从零开始-同步工具类相关的知识,希望对你有一定的参考价值。

并发编程从零开始(十)-同步工具类

6 同步工具类

6.1 Semaphore

Semaphore也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单,如下所示:

有参方法tryAcquire(long timeout, TimeUnit unit)的作用是在指定的时间内尝试地获取1个许可,如果获取不到就返回false。

可以使用Semphore实现一个简易的抢座:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 5; i++) {
            Thread.sleep(500);
            new MySemphore(semaphore).start();
        }

    }
}


public class MySemphore extends Thread{
    private Semaphore semaphore;

    public MySemphore(Semaphore semaphore){
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+": working");
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+": releasing");
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

如下图所示,假设有n个线程来获取Semaphore里面的10份资源(n > 10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。

当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如下图所示:

由于Semaphore和锁的实现原理基本相同。资源总数即state的初始值,在acquire里对state变量进行CAS减操作,减到0之后,线程阻塞;在release里对state变量进行CAS加操作。


6.2 CountDownLatch

6.2.1 CountDownLatch使用场景

假设一个主线程要等待5个 Worker 线程执行完才能退出,可以使用CountDownLatch来实现:

线程:

public class MyThread extends Thread{
    private final CountDownLatch countDownLatch;
    private final Random random = new Random();

    public MyThread(String name ,CountDownLatch countDownLatch){
        super(name);
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(random.nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 运行结束");
        countDownLatch.countDown();
    }
}

Main类:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            new MyThread("THREAD"+i,countDownLatch).start();
        }
        //当前线程等待
        countDownLatch.await();
        System.out.println("程序运行结束");
    }
}

下图为CountDownLatch相关类的继承层次,CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和非公平之分。

6.2.2 await()实现分析

await()调用的是AQS 的模板方法,CountDownLatch.Sync重新实现了tryAccuqireShared方法。

从tryAcquireShared(...)方法的实现来看,只要state != 0,调用await()方法的线程便会被放入AQS的阻塞队列,进入阻塞状态。

6.2.3 countDown()实现分析

countDown()调用的AQS的模板方法releaseShared(),里面的tryReleaseShared(...)由CountDownLatch.Sync实现。从上面的代码可以看出,通过CAS减少state的值,只有state=0,tryReleaseShared(...)才会返回true,然后执行doReleaseShared(...),一次性唤醒队列中所有阻塞的线程。

总结:由于是基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过countDown()一直减state,减到0后一次性唤醒所有线程。如下图所示,假设初始总数为MN个线程await(),M个线程countDown(),减到0之后,N个线程被唤醒。


6.3 CyclicBarrier-循环屏障

6.3.1 CyclicBarrier 使用场景

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做 的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一 个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier使用方式比较简单:

//创建CyclicBarrier
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                // 当所有线程被唤醒时,执行Runnable。
                System.out.println("do something");
            }
        });
        
//线程进入到屏障进行阻塞
cyclicBarrier.await();

实现阶段运行:

Main:

public class Main {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("finish");
            }
        });
        for (int i = 0; i < 3; i++) {
            new MyThread(cyclicBarrier).start();
        }
    }
}

MyThread:

public class MyThread extends Thread{
    private final CyclicBarrier cyclicBarrier;
    private final Random random = new Random();

    public MyThread(CyclicBarrier cyclicBarrier){
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(random.nextInt(2000));
            System.out.println(Thread.currentThread().getName()+" A begin");
            cyclicBarrier.await();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName()+" B begin");
            cyclicBarrier.await();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName()+" C begin");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

在整个过程中,有2个同步点。只有所有的线程全部到达了同步点之后,最后一个进入的线程将唤醒所有被阻塞的线程。

6.3.2 CyclicBarrier实现原理

CyclicBarrier基于ReentrantLock+Condition实现

//该内部类用于表明当前循环屏障的状态,当broken为true时表示障碍器发生了异常
    private static class Generation {
        boolean broken = false;
    }
    //CyclicBarrier内部的显示锁
    private final ReentrantLock lock = new ReentrantLock();
    //通过上面的显式锁得到的Condition变量,障碍器能够阻塞和唤醒多个线程完全得益于这个Condition
    private final Condition trip = lock.newCondition();
    //临界值,当障碍器阻塞的线程数等于parties时即count=0,障碍器将会通过trip唤醒目前所有阻塞的线程
    private final int parties;
    //条件线程,当屏障被打破时,在障碍器通过trip唤醒所有正被阻塞的的线程之前,执行该线程,这个线程可以充当一个主线程,那些被阻塞的线程可以充当子线程,即可以实现当所有子线程都达到屏障时调用主线程的作用
    private final Runnable barrierCommand;
    //内部类Generation变量表示当前循环屏障CyclicBarrier的状态
    private Generation generation = new Generation();
    //计数器,用于计算还剩多少个线程还没有达到屏障处,初始值应该等于临界值parties
    private int count;

构造方法:

await()方法实现过程:

    //timed:表示是否设置了等待时间
    //nanos等待的时间(纳秒)
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //使用CyclicBarrier定义的显示锁,加锁避免并发问题
        lock.lock();
        try {
            //当前循环屏障的状态
            final Generation g = generation;
            //如果为true,表示障碍器之前发生了异常,抛出异常BrokenBarrierException
            if (g.broken)
                throw new BrokenBarrierException();
            //当前线程是否被中断
            if (Thread.interrupted()) {
                breakBarrier();//该方法会重置计数值count为parties,并且唤醒所有被阻塞的线程,并改变状态Generation
                throw new InterruptedException();
            }
            //屏障计数器减一
            int index = --count;
            //如果index等于0 ,达到屏障的线程的数量等于最开始设置的数量parties
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //如果条件线程不为空,则执行条件线程
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //唤醒所有被阻塞的线程,并且重置计数器count,生成新的状态generation
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)//如果ranAction为true,表示上面的代码没有顺利执行结束,表示障碍器发生了异常,调用breakBarrier重置计数器,并设置generation.broken=true表示当前的状态
                        breakBarrier();
                }
            }
 
            // 当计数器为零调用了Condition的唤醒方法、或者broken为true、或者线程中断、或者等待超时时跳出异常
            for (;;) {
                try {
                    //阻塞当前线程,如果timed为false表示没有设置等待的时间
                    if (!timed)
                        //不限时阻塞线程,只有当调用唤醒方法后才会继续执行
                        trip.await();
                    else if (nanos > 0L)
                        //等待nanos毫秒
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //调用await方法如果发生异常,并且此时CyclicBarrier还没有调用nextGeneration()方法重置计数器和generation
                    if (g == generation && ! g.broken) {
                        breakBarrier();//该方法会唤醒所有阻塞的线程,并且重置计数器,而且设置generation.broken = true表示障碍器发生了异常。
                        throw ie;
                    } else {
                        //中断当前线程
                        Thread.currentThread().interrupt();
                    }
                }
                //g.broken为true,表示障碍器发生了异常,抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                //index=0的唤醒操作顺利执行完了,所以通过nextGeneration()方法更新了generation,而由于generation是线程中的共享变量,所以当前线程此时 g!=generation
                if (g != generation)
                    return index;
                //如果timed为true表示设置了线程阻塞的时间,然后时间nanos却小于等于0,
                if (timed && nanos <= 0L) {
                    breakBarrier();//此时重置计数器,并且设置generation.broken=true表示障碍器发生异常
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
 
    //唤醒所有线程,重置计数器count,重新生成generation
    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }
 
    //设置generation.broken=true表示障碍器发生的异常,重置计数器count,唤醒所有阻塞的线程
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

关于上面的方法,有几点说明:

  1. CyclicBarrier是可以被重用的。以应聘场景为例,来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。

  2. CyclicBarrier 会响应中断。10 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()方法。然后count被重置为初始值(parties),重新开始。

  3. breakBarrier()只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线程每个都执行1次。


6.4 Exchanger

6.4.1 使用场景

Exchanger用于线程之间交换数据,其使用代码很简单,是一个exchange(...)方法,使用示例如下:

public class Main {
    private static final Random random = new Random();
    public static void main(String[] args) {
        // 建一个多线程共用的exchange对象
        // 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自己的数据作为参数
        // 传递进去,返回值是另外一个线程调用exchange传进去的参数
        Exchanger<String> exchanger = new Exchanger<>();
        new Thread("线程1") {
            @Override
            public void run() {
                while (true) {
                    try {
                        // 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调 用exchange为止。
                        String otherData = exchanger.exchange("交换数据1");
                        System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        new Thread("线程2") {
            @Override
            public void run() {
                while (true) {
                    try {
                        String otherData = exchanger.exchange("交换数据2");
                        System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        new Thread("线程3") {
            @Override
            public void run() {
                while (true) {
                    try {
                        String otherData = exchanger.exchange("交换数据3");
                        System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }   
}

在上面的例子中,3个线程并发地调用exchange(...),会两两交互数据,如1/2、1/3和2/3。

6.4.2 实现原理

Exchanger的核心机制和Lock一样,也是CAS+park/unpark。

park/unpark推荐阅读:https://www.cnblogs.com/set-cookie/p/9582547.html

首先,在Exchanger内部,有两个内部类:Participant和Node,代码如下:

每个线程在调用exchange(...)方法交换数据的时候,会先创建一个Node对象。

这个Node对象就是对该线程的包装,里面包含了3个重要字段:第一个是该线程要交互的数据,第二个是对方线程交换来的数据,最后一个是该线程自身。

一个Node只能支持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此在Exchanger里面定义了Node数组:

private volatile Node[] arena;
6.4.3 exchange( V x )实现分析

一对一交换数据使用slotExchange,其余情况使用arenaExchange。

上面方法中,如果arena不是null,表示启用了arena方式交换数据。如果arena不是null,并且线程被中断,则抛异常。

如果arena不是null,但是arenaExchange的返回值为null,则抛异常。对方线程交换来的null值是封装为NULL_ITEM对象的,而不是null。

如果slotExchange的返回值是null,并且线程被中断,则抛异常。

如果slotExchange的返回值是null,并且areaExchange的返回值是null,则抛异常。

slotExchange的实现:

arenaExchange的实现:


6.5 Phaser-移相器

6.5.1 用Phaser代替CyclicBarrier 和 CountDownLatch

从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。

CyclicBarrier解决了CountDownLatch不能重用的问题,但是仍有以下不足:

1)不能动态调整计数器值,假如线程数不足以打破barrier,就只能reset或者多加些线程,在实际运用中显然不现实

2)每次await仅消耗1个计数器值,不够灵活

Phaser就是用来解决这些问题的。Phaser将多个线程协作执行的任务划分为多个阶段,每个阶段都可以有任意个参与者,线程可以随时注册并参与到某个阶段。

代替CountDownLatch:

public class PhaserInsteadOfCountDownLatch {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(5);
        for (int i = 0; i < 5; i++) {
            new Thread("thread-"+(i+1)){
                private final Random random = new Random();

                @Override
                public void run() {
                    System.out.println(getName()+" start");
                    try {
                        Thread.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(getName()+" end");
                    phaser.arrive();
                }
            }.start();
        }
        System.out.println("threads start finish");
        phaser.awaitAdvance(phaser.getPhase());
        System.out.println("threads end finish");
    }
}

代替CyclicBarrier:

Main:

public class PhaserInsteadOfCyclicBarrier {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(5);
        for (int i = 0; i < 5; i++) {
            new MyThread(phaser).start();
        }
        phaser.awaitAdvance(0);
    }
}

MyThread:

public class MyThread extends Thread{
    private final Phaser phaser;
    private final Random random = new Random();

    public MyThread(Phaser phaser){
        this.phaser = phaser;
    }

    @Override
    public void run() {
        try {
            System.out.println("start a");
            Thread.sleep(500);
            System.out.println("end a");
            phaser.arriveAndAwaitAdvance();
            System.out.println("start b");
            Thread.sleep(500);
            System.out.println("end b");
            phaser.arriveAndAwaitAdvance();
            System.out.println("start c");
            Thread.sleep(500);
            System.out.println("end c");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

arriveAndAwaitAdance()就是 arrive()与 awaitAdvance(int)的组合,表示“我自己已到达这个同步点,同时要等待所有人都到达这个同步点,然后再一起前行”。

6.5.2 Phaser新特性
  1. 动态调整线程个数

    CyclicBarrier 所要同步的线程个数是在构造方法中指定的,之后不能更改,而 Phaser 可以在运行期间动态地调整要同步的线程个数。Phaser 提供了下面这些方法来增加、减少所要同步的线程个数。

  2. 层次Phaser

    多个Phaser可以组成如下图所示的树状结构,可以通过在构造方法中传入父Phaser来实现。

    public Phaser(Phaser parent, int parties){
    //....
    }
    

    通过parent节点来存储树状结构:

    private final Phaser parent;
    

可以发现,在Phaser的内部结构中,每个Phaser记录了自己的父节点,但并没有记录自己的子节点列表。所以,每个 Phaser 知道自己的父节点是谁,但父节点并不知道自己有多少个子节点,对父节点的操作,是通过子节点来实现的。

树状的Phaser怎么使用呢?考虑如下代码,会组成下图的树状Phaser。

Phaser root = new Phaser(2);
Phaser c1 = new Phaser(root,3);
Phaser c2 = new Phaser(root,2);
Phaser c3 = new Phaser(c1,0);

本来root有两个参与者,然后为其加入了两个子Phaser(c1,c2),每个子Phaser会算作1个参与者,root的参与者就变成2+2=4个。c1本来有3个参与者,为其加入了一个子Phaser c3,参与者数量变成3+1=4个。c3的参与者初始为0,后续可以通过调用register()方法加入。

对于树状Phaser上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser是一样的。

父Phaser并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册;当子Phaser中注册的参与者数量等于0时,会自动向父节点解除注册。父Phaser把子Phaser当作一个正常参与的线程就即可。

6.5.3 state变量解析

大致了解了Phaser的用法和新特性之后,下面仔细剖析其实现原理。Phaser没有基于AQS来实现,但具备AQS的核心特性:state变量、CAS操作、阻塞队列。先从state变量说起。

private volatile long state;

这个64位的state变量被拆成4部分:

Phaser提供了一系列的成员方法来从state中获取上图中的几个数字。下面再看一下state变量在构造方法中是如何被赋值的:

其中,已经定义了:

private static final int EMPTY = 1;
private static final int PHASE_SHIFT = 32;
private static final int PARTIES_SHIFT = 16;

当parties=0时,state被赋予一个EMPTY常量,常量为1;

当parties != 0时,把phase值左移32位;把parties左移16位;然后parties也作为最低的16位,3个值做或操作,赋值给state。

6.5.4 阻塞与唤醒(Treiber Stack)

基于上述的state变量,对其执行CAS操作,并进行相应的阻塞与唤醒。如下图所示,右边的主线程会调用awaitAdvance()进行阻塞;左边的arrive()会对state进行CAS的累减操作,当未到达的线程数减到0时,唤醒右边阻塞的主线程。

在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表。Treiber Stack是一个无锁的栈,它是一个单向链表,出栈、入栈都在链表头部,所以只需要一个head指针,而不需要tail指针,如下的实现:

为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Stack。当phase为奇数轮的时候,阻塞线程放在oddQ里面;当phase为偶数轮的时候,阻塞线程放在evenQ里面。代码如下所示:

6.5.5 arrive() 方法分析

下面看arrive()方法是如何对state变量进行操作,又是如何唤醒线程的。

其中,定义了变量:

private static final int ONE_ARRIVAL = 1;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
private static final int ONE_DEREGISTER = ONE_ARRIVAL\\ONE_PARTY;
private static final int PARTIES_SHIFT = 16;

arrive()和 arriveAndDeregister()内部调用的都是 doArrive(boolean)方法。

区别在于前者只是把“未达到线程数”减1;后者则把“未到达线程数”和“下一轮的总线程数”都减1。下面看一下doArrive(boolean)方法的实现。

关于上面的方法,有以下几点说明:

  1. 定义了2个常量如下。当 deregister=false 时,只最低的16位需要减 1,adj=ONE_ARRIVAL;当deregister=true时,低32位中的2个16位都需要减1,adj=ONE_ARRIVAL|ONE_PARTY。
  2. 把未到达线程数减1。减了之后,如果还未到0,什么都不做,直接返回。如果到0,会做2件事情:第1,重置state,把state的未到达线程个数重置到总的注册的线程数中,同时phase加 1;第2,唤醒队列中的线程。

下面看一下唤醒方法:

遍历整个栈,只要栈当中节点的phase不等于当前Phaser的phase,说明该节点不是当前轮的,而是前一轮的,应该被释放并唤醒。

6.5.6 awaitAdvance()方法分析

下面的while循环中有4个分支:

初始的时候,node==null,进入第1个分支进行自旋,自旋次数满足之后,会新建一个QNode节点;

之后执行第3、第4个分支,分别把该节点入栈并阻塞。

这里调用了ForkJoinPool.managedBlock(ManagedBlocker blocker)方法,目的是把node对应的线程阻塞。ManagerdBlocker是ForkJoinPool里面的一个接口,定义如下:

QNode实现了该接口,实现原理还是park(),如下所示。之所以没有直接使用park()/unpark()来实现阻塞、唤醒,而是封装了ManagedBlocker这一层,主要是出于使用上的方便考虑。一方面是park()可能被中断唤醒,另一方面是带超时时间的park(),把这二者都封装在一起。

理解了arrive()和awaitAdvance(),arriveAndAwaitAdvance()就是二者的一个组合版本。

Java并发编程学习6-同步工具类和并发容器

本篇开始将要介绍 Java 平台类库下的一些最常用的 并发基础构建模块,以及使用这些模块来构造并发应用程序时的一些常用模式。

同步容器类

同步容器类包括 VectorHashtable,还有由 Collections.synchronizedXxx 等工厂方法创建的同步的封装器类。

这些类实现线程安全性的方法是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

1. 同步容器类的问题

同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。

容器里常见的复合操作包括:

  • 迭代(反复访问元素,直到遍历完容器中的所有元素)
  • 跳转(根据指定顺序找到当前元素的下一个元素)
  • 条件运算(例如“若没有则添加”)

下面假设我们在 Vector 中定义两个复合操作的方法:getLastdeleteLast,它们都会执行 “先检查后执行” 操作。

	public static Object getLast(Vector list) 
		int lastIndex = list.size() - 1;
		return list.get(lastIndex);
	
	
	public static void deleteLast(Vector list) 
		int lastIndex = list.size() - 1;
		list.remove(lastIndex);
	

上面定义的两个方法,看似没有任何问题,从某种程度上来看也的确如此,无论多少个线程同时调用它们,也不会破坏 Vector

但如果从调用者的角度去看,如果线程 A 在包含 10 个元素的 Vector 上调用 getLast,同时线程 B 在同一个 Vector 上调用 deleteLast,这些操作交替执行如下图所示,getLast 将抛出 ArrayIndexOutOfBoundsException 异常。这里虽然很好地遵循了 Vector 的规范(如果请求一个不存在的元素,那么将抛出一个异常),但这并不是调用者所希望看到的结果,除非 Vector 一开始就是空的。

同步容器类通过自身的锁来保护它的每个方法,因此只要获得容器类的锁,上面的 getLastdeleteLast 方法就可以成为原子操作。

下面看一下代码示例:

	public static Object getLast(Vector list) 
		synchronized (list) 
			int lastIndex = list.size() - 1;
			return list.get(lastIndex);
		
	
	
	public static void deleteLast(Vector list) 
		synchronized (list) 
			int lastIndex = list.size() - 1;
			list.remove(lastIndex);
		
	

getLast 一样,如果在对 Vector 进行迭代时,另一个线程删除了一个元素,并且这两个操作交替执行,那么这种迭代方法也将抛出 ArrayIndexOutOfBoundsException 异常。

	for (int i = 0; i < vector.size(); i++)
		doSomething(vector.get(i));

虽然上述迭代操作可能抛出异常,但并不意味着 Vector 就不是线程安全的。Vector 的状态仍然是有效的,而抛出的异常也与其规范保持一致。

像读取最后一个元素或者迭代这样的简单操作中抛出异常,显然是调用者不愿意看到的。我们可以通过在迭代期间持有 Vector 的锁,可以防止其他线程在迭代期间修改 Vector。当然这会导致其他线程在迭代期间无法访问它,从而降低了并发性。

	synchronized (vector) 
		for (int i = 0; i < vector.size(); i++)
			doSomething(vector.get(i));
	

2. 迭代器与 ConcurrentModificationException

在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为是 “及时失败” 的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个 ConcurrentModificationException 异常。

这种 “及时失败” 的迭代器只能作为并发问题的预警指示器。如果在迭代期间计数器被修改,那么 hasNextnext 将抛出 ConcurrentModificationException。然而,这种检查是在没有同步的情况下进行的,因此可能会看到失效的计数值,而迭代器可能并没有意识到已经发生了修改。这是一种设计上的权衡,从而降低并发修改操作的检测代码对程序性能带来的影响。

下面我们看一个代码示例,使用 for-each 循环语法对 List 容器进行迭代。

	List<Person> personList = Collections.synchronizedList(new ArrayList<Person>());
	
	// 可能抛出 ConcurrentModificationException
	for (Person p : personList)
		doSomething(p);

从编译后的Class文件来看,上述 for-each 循环语法,javac 将生成使用 Iterator 的代码,反复调用 hasNextnext 来迭代 List 对象。 与迭代 Vector 一样,想要避免出现 ConcurrentModificationException,就必须在迭代过程中持有容器的锁。

如果不希望在迭代期间对容器加锁,那么可以“克隆”容器,并在副本上进行迭代。由于副本被封闭在线程内,因此其他线程不会在迭代期间对其进行修改,这样就避免了抛出 ConcurrentModificationException(在克隆过程中仍然需要对容器加锁)。

当然克隆容器存在显著的性能开销。这种方式的好坏,取决于容器的大小,在每个元素上执行的操作,迭代操作相对于容器其他操作的调用频率,以及在响应时间和吞吐量等方面的需求。

3. 隐藏迭代器

虽然加锁可以防止迭代器抛出 ConcurrentModificationException,但需要记住在所有对共享容器进行迭代的地方都需要加锁。

下面我们来看一个示例,在 HiddenIterator 中没有显式的容器迭代操作,但在 System.out.pringln 中将执行迭代操作。

	@NotThreadSafe
	public class HiddenIterator 
		@GuardedBy("this")
		private final Set<Integer> set = new HashSet<Integer>();
		
		public synchronized void add(Integer i) 
			set.add(i);
		
		
		public synchronized void remove(Integer i) 
			set.remove(i);
		
		
		public void addTenThings() 
			Random r = new Random();
			for (int i = 0; i < 10; i++)
				add(r.nextInt());
			// 隐藏在字符串连接中的迭代操作
			System.out.pringln("DEBUG: added ten elements to " + set);
		
	

上述 System.out.pringln 代码中,编译器将字符串的连接操作转换为调用 StringBuilder.append(Object),而这个方法又会调用容器的 toString 方法,标准容器的 toString 方法将迭代容器,并在每个元素上调用 toString 来生成容器内容的格式化表示。并发环境下,addTenThings 方法可能会抛出 ConcurrentModificationException

如果状态与保护它的同步代码之间相隔越远,那么开发人员就越容易忘记在访问状态时使用正确的同步。如果 HiddenIteratorsynchronizedSet 来包装 HashSet,并且对同步的代码进行封装,那么就不会发生这种错误。

正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略。

除了 toString 对容器进行迭代,还有容器的 hashCodeequalscontainsAllremoveAllretainAll 等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。所有这些间接的迭代操作都有可能抛出 ConcurrentModificationException

并发容器

上面提到的同步容器,它是将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方式的代价就是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。

并发容器是针对多个线程并发访问而设计,如 ConcurrentHashMap,用于替代同步且基于散列的 Map;CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的 List。

通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。

1. ConcurrentHashMap

HashMap 一样,ConcurrentHashMap 也是一个基于散列的 Map, 但它使用了一种粒度更细的加锁机制来实现更大程度的共享,提供更高的并发性和伸缩性,这种机制称为分段锁(Lock Striping,以后的博文会讲解到)。在这种机制中,任意数量的读取线程可以并发地访问 Map,执行读取操作的线程和执行写入操作的线程可以并发地访问 Map,并且一定数量的写入线程可以并发地修改 Map

ConcurrentHashMap 与其他并发容器一起增强了同步容器类,有如下的特点:

  • 它们提供的迭代器不会抛出 ConcurrentModificationException,因此不需要再迭代过程中对容器加锁。
  • ConcurrentHashMap 返回的迭代器具有弱一致性(Weakly Consistent),而并非 ”及时失败“。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。

对于一些需要在整个 Map 上进行计算的方法,例如 sizeisEmpty,这些方法的语义被略微减弱了以反映容器的并发特性。由于 size 返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许 size 返回一个近似值而不是一个精确值。事实上 sizeisEmpty 这样的方法在并发环境下的用处很小,因为它们的返回值总是不断变化。因此,这些操作的需求被弱化了,以及换取对其他更重要操作的性能优化,包括 getputcontainsKeyremove 等。

ConcurrentHashMap 中没有实现对 Map 加锁以提供独占访问,而在 HashtablesynchronizedMap 中,获得 Map 的锁能防止其他线程访问这个 Map。大多数情况下,用 ConcurrentHashMap 来代替同步 Map 能进一步提高代码的可伸缩性。

2. 额外的原子Map操作

由于 ConcurrentHashMap 不能被加锁来执行独占访问,因此无法使用客户端加锁来创造新的原子操作。 不过像 “若没有则添加”、“若相等则移除” 和 “若相等则替换” 等,都已经实现为原子操作并且在 ConcurrentMap 的接口中声明,如下代码所示:

	public intercace ConcurrentHashMap<K, V> extends Map<K, V> 
		// 仅当 K 没有相应的映射值时才插入
		V putIfAbsent(K key, V value);
		
		// 仅当 K 被映射到 V 才移除
		boolean remove(K key, V value);
		
		// 仅当 K 被映射到 oldValue 时才替换为 newValue
		boolean replace(K key, V oldValue, V newValue);
		
		// 仅当K 被映射到某个值时才替换为 newValue
		V replace(K key, V newValue);
	

如果你需要在现有的同步 Map 中添加如上的操作,那么也就意味着应该考虑使用 ConcurrentMap 了。

3. CopyOnWriteArrayList

CopyOnWriteArrayList 用于替代同步 List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。

类似地,CopyOnWriteArraySet 用于替代同步Set。

“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。“写入时复制” 容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。

显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。仅当迭代操作远远多于修改操作时,才应该使用 “写入时复制” 容器。

许多事件通知系统中,在分发通知时需要迭代已注册监听器链表,并调用每一个监听器,在大多数情况下,注册和注销事件监听器的操作远少于接收事件通知的操作。

4. 阻塞队列

这块的篇幅较多,下一篇博文将会详细介绍,尽情期待!

以上是关于并发编程从零开始-同步工具类的主要内容,如果未能解决你的问题,请参考以下文章

从零开始学多线程之构建快

并发编程从零开始(十五)-CompletableFuture

Java并发编程学习8-同步工具类

Java并发编程学习8-同步工具类

Java并发编程学习8-同步工具类

java并发编程之工具类