并发编程从零开始-同步工具类
Posted 会编程的老六
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程从零开始-同步工具类相关的知识,希望对你有一定的参考价值。
并发编程从零开始(十)-同步工具类
6 同步工具类
6.1 Semaphore
Semaphore也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单,如下所示:
有参方法tryAcquire(long timeout, TimeUnit unit)的作用是在指定的时间内尝试地获取1个许可,如果获取不到就返回false。
可以使用Semphore实现一个简易的抢座:
public class Main {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i++) {
Thread.sleep(500);
new MySemphore(semaphore).start();
}
}
}
public class MySemphore extends Thread{
private Semaphore semaphore;
public MySemphore(Semaphore semaphore){
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+": working");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+": releasing");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如下图所示,假设有n个线程来获取Semaphore里面的10份资源(n > 10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。
当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如下图所示:
由于Semaphore和锁的实现原理基本相同。资源总数即state的初始值,在acquire里对state变量进行CAS减操作,减到0之后,线程阻塞;在release里对state变量进行CAS加操作。
6.2 CountDownLatch
6.2.1 CountDownLatch使用场景
假设一个主线程要等待5个 Worker 线程执行完才能退出,可以使用CountDownLatch来实现:
线程:
public class MyThread extends Thread{
private final CountDownLatch countDownLatch;
private final Random random = new Random();
public MyThread(String name ,CountDownLatch countDownLatch){
super(name);
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 运行结束");
countDownLatch.countDown();
}
}
Main类:
public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new MyThread("THREAD"+i,countDownLatch).start();
}
//当前线程等待
countDownLatch.await();
System.out.println("程序运行结束");
}
}
下图为CountDownLatch相关类的继承层次,CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和非公平之分。
6.2.2 await()实现分析
await()调用的是AQS 的模板方法,CountDownLatch.Sync重新实现了tryAccuqireShared方法。
从tryAcquireShared(...)方法的实现来看,只要state != 0,调用await()方法的线程便会被放入AQS的阻塞队列,进入阻塞状态。
6.2.3 countDown()实现分析
countDown()调用的AQS的模板方法releaseShared(),里面的tryReleaseShared(...)由CountDownLatch.Sync实现。从上面的代码可以看出,通过CAS减少state的值,只有state=0,tryReleaseShared(...)才会返回true,然后执行doReleaseShared(...),一次性唤醒队列中所有阻塞的线程。
总结:由于是基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过countDown()一直减state,减到0后一次性唤醒所有线程。如下图所示,假设初始总数为M,N个线程await(),M个线程countDown(),减到0之后,N个线程被唤醒。
6.3 CyclicBarrier-循环屏障
6.3.1 CyclicBarrier 使用场景
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做 的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一 个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier使用方式比较简单:
//创建CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
// 当所有线程被唤醒时,执行Runnable。
System.out.println("do something");
}
});
//线程进入到屏障进行阻塞
cyclicBarrier.await();
实现阶段运行:
Main:
public class Main {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("finish");
}
});
for (int i = 0; i < 3; i++) {
new MyThread(cyclicBarrier).start();
}
}
}
MyThread:
public class MyThread extends Thread{
private final CyclicBarrier cyclicBarrier;
private final Random random = new Random();
public MyThread(CyclicBarrier cyclicBarrier){
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName()+" A begin");
cyclicBarrier.await();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+" B begin");
cyclicBarrier.await();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+" C begin");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
在整个过程中,有2个同步点。只有所有的线程全部到达了同步点之后,最后一个进入的线程将唤醒所有被阻塞的线程。
6.3.2 CyclicBarrier实现原理
CyclicBarrier基于ReentrantLock+Condition实现
//该内部类用于表明当前循环屏障的状态,当broken为true时表示障碍器发生了异常
private static class Generation {
boolean broken = false;
}
//CyclicBarrier内部的显示锁
private final ReentrantLock lock = new ReentrantLock();
//通过上面的显式锁得到的Condition变量,障碍器能够阻塞和唤醒多个线程完全得益于这个Condition
private final Condition trip = lock.newCondition();
//临界值,当障碍器阻塞的线程数等于parties时即count=0,障碍器将会通过trip唤醒目前所有阻塞的线程
private final int parties;
//条件线程,当屏障被打破时,在障碍器通过trip唤醒所有正被阻塞的的线程之前,执行该线程,这个线程可以充当一个主线程,那些被阻塞的线程可以充当子线程,即可以实现当所有子线程都达到屏障时调用主线程的作用
private final Runnable barrierCommand;
//内部类Generation变量表示当前循环屏障CyclicBarrier的状态
private Generation generation = new Generation();
//计数器,用于计算还剩多少个线程还没有达到屏障处,初始值应该等于临界值parties
private int count;
构造方法:
await()方法实现过程:
//timed:表示是否设置了等待时间
//nanos等待的时间(纳秒)
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//使用CyclicBarrier定义的显示锁,加锁避免并发问题
lock.lock();
try {
//当前循环屏障的状态
final Generation g = generation;
//如果为true,表示障碍器之前发生了异常,抛出异常BrokenBarrierException
if (g.broken)
throw new BrokenBarrierException();
//当前线程是否被中断
if (Thread.interrupted()) {
breakBarrier();//该方法会重置计数值count为parties,并且唤醒所有被阻塞的线程,并改变状态Generation
throw new InterruptedException();
}
//屏障计数器减一
int index = --count;
//如果index等于0 ,达到屏障的线程的数量等于最开始设置的数量parties
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//如果条件线程不为空,则执行条件线程
if (command != null)
command.run();
ranAction = true;
//唤醒所有被阻塞的线程,并且重置计数器count,生成新的状态generation
nextGeneration();
return 0;
} finally {
if (!ranAction)//如果ranAction为true,表示上面的代码没有顺利执行结束,表示障碍器发生了异常,调用breakBarrier重置计数器,并设置generation.broken=true表示当前的状态
breakBarrier();
}
}
// 当计数器为零调用了Condition的唤醒方法、或者broken为true、或者线程中断、或者等待超时时跳出异常
for (;;) {
try {
//阻塞当前线程,如果timed为false表示没有设置等待的时间
if (!timed)
//不限时阻塞线程,只有当调用唤醒方法后才会继续执行
trip.await();
else if (nanos > 0L)
//等待nanos毫秒
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//调用await方法如果发生异常,并且此时CyclicBarrier还没有调用nextGeneration()方法重置计数器和generation
if (g == generation && ! g.broken) {
breakBarrier();//该方法会唤醒所有阻塞的线程,并且重置计数器,而且设置generation.broken = true表示障碍器发生了异常。
throw ie;
} else {
//中断当前线程
Thread.currentThread().interrupt();
}
}
//g.broken为true,表示障碍器发生了异常,抛出异常
if (g.broken)
throw new BrokenBarrierException();
//index=0的唤醒操作顺利执行完了,所以通过nextGeneration()方法更新了generation,而由于generation是线程中的共享变量,所以当前线程此时 g!=generation
if (g != generation)
return index;
//如果timed为true表示设置了线程阻塞的时间,然后时间nanos却小于等于0,
if (timed && nanos <= 0L) {
breakBarrier();//此时重置计数器,并且设置generation.broken=true表示障碍器发生异常
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
//唤醒所有线程,重置计数器count,重新生成generation
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
//设置generation.broken=true表示障碍器发生的异常,重置计数器count,唤醒所有阻塞的线程
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
关于上面的方法,有几点说明:
-
CyclicBarrier是可以被重用的。以应聘场景为例,来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
-
CyclicBarrier 会响应中断。10 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()方法。然后count被重置为初始值(parties),重新开始。
-
breakBarrier()只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线程每个都执行1次。
6.4 Exchanger
6.4.1 使用场景
Exchanger用于线程之间交换数据,其使用代码很简单,是一个exchange(...)方法,使用示例如下:
public class Main {
private static final Random random = new Random();
public static void main(String[] args) {
// 建一个多线程共用的exchange对象
// 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自己的数据作为参数
// 传递进去,返回值是另外一个线程调用exchange传进去的参数
Exchanger<String> exchanger = new Exchanger<>();
new Thread("线程1") {
@Override
public void run() {
while (true) {
try {
// 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调 用exchange为止。
String otherData = exchanger.exchange("交换数据1");
System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new Thread("线程2") {
@Override
public void run() {
while (true) {
try {
String otherData = exchanger.exchange("交换数据2");
System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new Thread("线程3") {
@Override
public void run() {
while (true) {
try {
String otherData = exchanger.exchange("交换数据3");
System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
在上面的例子中,3个线程并发地调用exchange(...),会两两交互数据,如1/2、1/3和2/3。
6.4.2 实现原理
Exchanger的核心机制和Lock一样,也是CAS+park/unpark。
park/unpark推荐阅读:https://www.cnblogs.com/set-cookie/p/9582547.html
首先,在Exchanger内部,有两个内部类:Participant和Node,代码如下:
每个线程在调用exchange(...)方法交换数据的时候,会先创建一个Node对象。
这个Node对象就是对该线程的包装,里面包含了3个重要字段:第一个是该线程要交互的数据,第二个是对方线程交换来的数据,最后一个是该线程自身。
一个Node只能支持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此在Exchanger里面定义了Node数组:
private volatile Node[] arena;
6.4.3 exchange( V x )实现分析
一对一交换数据使用slotExchange,其余情况使用arenaExchange。
上面方法中,如果arena不是null,表示启用了arena方式交换数据。如果arena不是null,并且线程被中断,则抛异常。
如果arena不是null,但是arenaExchange的返回值为null,则抛异常。对方线程交换来的null值是封装为NULL_ITEM对象的,而不是null。
如果slotExchange的返回值是null,并且线程被中断,则抛异常。
如果slotExchange的返回值是null,并且areaExchange的返回值是null,则抛异常。
slotExchange的实现:
arenaExchange的实现:
6.5 Phaser-移相器
6.5.1 用Phaser代替CyclicBarrier 和 CountDownLatch
从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。
CyclicBarrier解决了CountDownLatch不能重用的问题,但是仍有以下不足:
1)不能动态调整计数器值,假如线程数不足以打破barrier,就只能reset或者多加些线程,在实际运用中显然不现实
2)每次await仅消耗1个计数器值,不够灵活
Phaser就是用来解决这些问题的。Phaser将多个线程协作执行的任务划分为多个阶段,每个阶段都可以有任意个参与者,线程可以随时注册并参与到某个阶段。
代替CountDownLatch:
public class PhaserInsteadOfCountDownLatch {
public static void main(String[] args) {
Phaser phaser = new Phaser(5);
for (int i = 0; i < 5; i++) {
new Thread("thread-"+(i+1)){
private final Random random = new Random();
@Override
public void run() {
System.out.println(getName()+" start");
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName()+" end");
phaser.arrive();
}
}.start();
}
System.out.println("threads start finish");
phaser.awaitAdvance(phaser.getPhase());
System.out.println("threads end finish");
}
}
代替CyclicBarrier:
Main:
public class PhaserInsteadOfCyclicBarrier {
public static void main(String[] args) {
Phaser phaser = new Phaser(5);
for (int i = 0; i < 5; i++) {
new MyThread(phaser).start();
}
phaser.awaitAdvance(0);
}
}
MyThread:
public class MyThread extends Thread{
private final Phaser phaser;
private final Random random = new Random();
public MyThread(Phaser phaser){
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println("start a");
Thread.sleep(500);
System.out.println("end a");
phaser.arriveAndAwaitAdvance();
System.out.println("start b");
Thread.sleep(500);
System.out.println("end b");
phaser.arriveAndAwaitAdvance();
System.out.println("start c");
Thread.sleep(500);
System.out.println("end c");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
arriveAndAwaitAdance()就是 arrive()与 awaitAdvance(int)的组合,表示“我自己已到达这个同步点,同时要等待所有人都到达这个同步点,然后再一起前行”。
6.5.2 Phaser新特性
-
动态调整线程个数
CyclicBarrier 所要同步的线程个数是在构造方法中指定的,之后不能更改,而 Phaser 可以在运行期间动态地调整要同步的线程个数。Phaser 提供了下面这些方法来增加、减少所要同步的线程个数。
-
层次Phaser
多个Phaser可以组成如下图所示的树状结构,可以通过在构造方法中传入父Phaser来实现。
public Phaser(Phaser parent, int parties){ //.... }
通过parent节点来存储树状结构:
private final Phaser parent;
可以发现,在Phaser的内部结构中,每个Phaser记录了自己的父节点,但并没有记录自己的子节点列表。所以,每个 Phaser 知道自己的父节点是谁,但父节点并不知道自己有多少个子节点,对父节点的操作,是通过子节点来实现的。
树状的Phaser怎么使用呢?考虑如下代码,会组成下图的树状Phaser。
Phaser root = new Phaser(2);
Phaser c1 = new Phaser(root,3);
Phaser c2 = new Phaser(root,2);
Phaser c3 = new Phaser(c1,0);
本来root有两个参与者,然后为其加入了两个子Phaser(c1,c2),每个子Phaser会算作1个参与者,root的参与者就变成2+2=4个。c1本来有3个参与者,为其加入了一个子Phaser c3,参与者数量变成3+1=4个。c3的参与者初始为0,后续可以通过调用register()方法加入。
对于树状Phaser上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser是一样的。
父Phaser并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册;当子Phaser中注册的参与者数量等于0时,会自动向父节点解除注册。父Phaser把子Phaser当作一个正常参与的线程就即可。
6.5.3 state变量解析
大致了解了Phaser的用法和新特性之后,下面仔细剖析其实现原理。Phaser没有基于AQS来实现,但具备AQS的核心特性:state变量、CAS操作、阻塞队列。先从state变量说起。
private volatile long state;
这个64位的state变量被拆成4部分:
Phaser提供了一系列的成员方法来从state中获取上图中的几个数字。下面再看一下state变量在构造方法中是如何被赋值的:
其中,已经定义了:
private static final int EMPTY = 1;
private static final int PHASE_SHIFT = 32;
private static final int PARTIES_SHIFT = 16;
当parties=0时,state被赋予一个EMPTY常量,常量为1;
当parties != 0时,把phase值左移32位;把parties左移16位;然后parties也作为最低的16位,3个值做或操作,赋值给state。
6.5.4 阻塞与唤醒(Treiber Stack)
基于上述的state变量,对其执行CAS操作,并进行相应的阻塞与唤醒。如下图所示,右边的主线程会调用awaitAdvance()进行阻塞;左边的arrive()会对state进行CAS的累减操作,当未到达的线程数减到0时,唤醒右边阻塞的主线程。
在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表。Treiber Stack是一个无锁的栈,它是一个单向链表,出栈、入栈都在链表头部,所以只需要一个head指针,而不需要tail指针,如下的实现:
为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Stack。当phase为奇数轮的时候,阻塞线程放在oddQ里面;当phase为偶数轮的时候,阻塞线程放在evenQ里面。代码如下所示:
6.5.5 arrive() 方法分析
下面看arrive()方法是如何对state变量进行操作,又是如何唤醒线程的。
其中,定义了变量:
private static final int ONE_ARRIVAL = 1;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
private static final int ONE_DEREGISTER = ONE_ARRIVAL\\ONE_PARTY;
private static final int PARTIES_SHIFT = 16;
arrive()和 arriveAndDeregister()内部调用的都是 doArrive(boolean)方法。
区别在于前者只是把“未达到线程数”减1;后者则把“未到达线程数”和“下一轮的总线程数”都减1。下面看一下doArrive(boolean)方法的实现。
关于上面的方法,有以下几点说明:
- 定义了2个常量如下。当 deregister=false 时,只最低的16位需要减 1,adj=ONE_ARRIVAL;当deregister=true时,低32位中的2个16位都需要减1,adj=ONE_ARRIVAL|ONE_PARTY。
- 把未到达线程数减1。减了之后,如果还未到0,什么都不做,直接返回。如果到0,会做2件事情:第1,重置state,把state的未到达线程个数重置到总的注册的线程数中,同时phase加 1;第2,唤醒队列中的线程。
下面看一下唤醒方法:
遍历整个栈,只要栈当中节点的phase不等于当前Phaser的phase,说明该节点不是当前轮的,而是前一轮的,应该被释放并唤醒。
6.5.6 awaitAdvance()方法分析
下面的while循环中有4个分支:
初始的时候,node==null,进入第1个分支进行自旋,自旋次数满足之后,会新建一个QNode节点;
之后执行第3、第4个分支,分别把该节点入栈并阻塞。
这里调用了ForkJoinPool.managedBlock(ManagedBlocker blocker)方法,目的是把node对应的线程阻塞。ManagerdBlocker是ForkJoinPool里面的一个接口,定义如下:
QNode实现了该接口,实现原理还是park(),如下所示。之所以没有直接使用park()/unpark()来实现阻塞、唤醒,而是封装了ManagedBlocker这一层,主要是出于使用上的方便考虑。一方面是park()可能被中断唤醒,另一方面是带超时时间的park(),把这二者都封装在一起。
理解了arrive()和awaitAdvance(),arriveAndAwaitAdvance()就是二者的一个组合版本。
Java并发编程学习6-同步工具类和并发容器
本篇开始将要介绍 Java 平台类库下的一些最常用的 并发基础构建模块,以及使用这些模块来构造并发应用程序时的一些常用模式。
同步容器类
同步容器类包括 Vector 和 Hashtable,还有由 Collections.synchronizedXxx 等工厂方法创建的同步的封装器类。
这些类实现线程安全性的方法是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。
1. 同步容器类的问题
同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。
容器里常见的复合操作包括:
- 迭代(反复访问元素,直到遍历完容器中的所有元素)
- 跳转(根据指定顺序找到当前元素的下一个元素)
- 条件运算(例如“若没有则添加”)
下面假设我们在 Vector 中定义两个复合操作的方法:getLast 和 deleteLast,它们都会执行 “先检查后执行” 操作。
public static Object getLast(Vector list)
int lastIndex = list.size() - 1;
return list.get(lastIndex);
public static void deleteLast(Vector list)
int lastIndex = list.size() - 1;
list.remove(lastIndex);
上面定义的两个方法,看似没有任何问题,从某种程度上来看也的确如此,无论多少个线程同时调用它们,也不会破坏 Vector。
但如果从调用者的角度去看,如果线程 A 在包含 10 个元素的 Vector 上调用 getLast,同时线程 B 在同一个 Vector 上调用 deleteLast,这些操作交替执行如下图所示,getLast 将抛出 ArrayIndexOutOfBoundsException 异常。这里虽然很好地遵循了 Vector 的规范(如果请求一个不存在的元素,那么将抛出一个异常),但这并不是调用者所希望看到的结果,除非 Vector 一开始就是空的。
同步容器类通过自身的锁来保护它的每个方法,因此只要获得容器类的锁,上面的 getLast 和 deleteLast 方法就可以成为原子操作。
下面看一下代码示例:
public static Object getLast(Vector list)
synchronized (list)
int lastIndex = list.size() - 1;
return list.get(lastIndex);
public static void deleteLast(Vector list)
synchronized (list)
int lastIndex = list.size() - 1;
list.remove(lastIndex);
与 getLast 一样,如果在对 Vector 进行迭代时,另一个线程删除了一个元素,并且这两个操作交替执行,那么这种迭代方法也将抛出 ArrayIndexOutOfBoundsException 异常。
for (int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
虽然上述迭代操作可能抛出异常,但并不意味着 Vector 就不是线程安全的。Vector 的状态仍然是有效的,而抛出的异常也与其规范保持一致。
像读取最后一个元素或者迭代这样的简单操作中抛出异常,显然是调用者不愿意看到的。我们可以通过在迭代期间持有 Vector 的锁,可以防止其他线程在迭代期间修改 Vector。当然这会导致其他线程在迭代期间无法访问它,从而降低了并发性。
synchronized (vector)
for (int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
2. 迭代器与 ConcurrentModificationException
在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为是 “及时失败” 的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个 ConcurrentModificationException 异常。
这种 “及时失败” 的迭代器只能作为并发问题的预警指示器。如果在迭代期间计数器被修改,那么 hasNext 或 next 将抛出 ConcurrentModificationException。然而,这种检查是在没有同步的情况下进行的,因此可能会看到失效的计数值,而迭代器可能并没有意识到已经发生了修改。这是一种设计上的权衡,从而降低并发修改操作的检测代码对程序性能带来的影响。
下面我们看一个代码示例,使用 for-each 循环语法对 List 容器进行迭代。
List<Person> personList = Collections.synchronizedList(new ArrayList<Person>());
// 可能抛出 ConcurrentModificationException
for (Person p : personList)
doSomething(p);
从编译后的Class文件来看,上述 for-each 循环语法,javac 将生成使用 Iterator 的代码,反复调用 hasNext 和 next 来迭代 List 对象。 与迭代 Vector 一样,想要避免出现 ConcurrentModificationException,就必须在迭代过程中持有容器的锁。
如果不希望在迭代期间对容器加锁,那么可以“克隆”容器,并在副本上进行迭代。由于副本被封闭在线程内,因此其他线程不会在迭代期间对其进行修改,这样就避免了抛出 ConcurrentModificationException(在克隆过程中仍然需要对容器加锁)。
当然克隆容器存在显著的性能开销。这种方式的好坏,取决于容器的大小,在每个元素上执行的操作,迭代操作相对于容器其他操作的调用频率,以及在响应时间和吞吐量等方面的需求。
3. 隐藏迭代器
虽然加锁可以防止迭代器抛出 ConcurrentModificationException,但需要记住在所有对共享容器进行迭代的地方都需要加锁。
下面我们来看一个示例,在 HiddenIterator 中没有显式的容器迭代操作,但在 System.out.pringln 中将执行迭代操作。
@NotThreadSafe
public class HiddenIterator
@GuardedBy("this")
private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i)
set.add(i);
public synchronized void remove(Integer i)
set.remove(i);
public void addTenThings()
Random r = new Random();
for (int i = 0; i < 10; i++)
add(r.nextInt());
// 隐藏在字符串连接中的迭代操作
System.out.pringln("DEBUG: added ten elements to " + set);
上述 System.out.pringln 代码中,编译器将字符串的连接操作转换为调用 StringBuilder.append(Object),而这个方法又会调用容器的 toString 方法,标准容器的 toString 方法将迭代容器,并在每个元素上调用 toString 来生成容器内容的格式化表示。并发环境下,addTenThings 方法可能会抛出 ConcurrentModificationException。
如果状态与保护它的同步代码之间相隔越远,那么开发人员就越容易忘记在访问状态时使用正确的同步。如果 HiddenIterator 用 synchronizedSet 来包装 HashSet,并且对同步的代码进行封装,那么就不会发生这种错误。
正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略。
除了 toString 对容器进行迭代,还有容器的 hashCode、equals、containsAll、removeAll 和 retainAll 等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。所有这些间接的迭代操作都有可能抛出 ConcurrentModificationException。
并发容器
上面提到的同步容器,它是将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方式的代价就是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。
并发容器是针对多个线程并发访问而设计,如 ConcurrentHashMap,用于替代同步且基于散列的 Map;CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的 List。
通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。
1. ConcurrentHashMap
与 HashMap 一样,ConcurrentHashMap 也是一个基于散列的 Map, 但它使用了一种粒度更细的加锁机制来实现更大程度的共享,提供更高的并发性和伸缩性,这种机制称为分段锁(Lock Striping,以后的博文会讲解到)。在这种机制中,任意数量的读取线程可以并发地访问 Map,执行读取操作的线程和执行写入操作的线程可以并发地访问 Map,并且一定数量的写入线程可以并发地修改 Map。
ConcurrentHashMap 与其他并发容器一起增强了同步容器类,有如下的特点:
- 它们提供的迭代器不会抛出 ConcurrentModificationException,因此不需要再迭代过程中对容器加锁。
- ConcurrentHashMap 返回的迭代器具有弱一致性(Weakly Consistent),而并非 ”及时失败“。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。
对于一些需要在整个 Map 上进行计算的方法,例如 size 和 isEmpty,这些方法的语义被略微减弱了以反映容器的并发特性。由于 size 返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许 size 返回一个近似值而不是一个精确值。事实上 size 和 isEmpty 这样的方法在并发环境下的用处很小,因为它们的返回值总是不断变化。因此,这些操作的需求被弱化了,以及换取对其他更重要操作的性能优化,包括 get、put、containsKey 和 remove 等。
在 ConcurrentHashMap 中没有实现对 Map 加锁以提供独占访问,而在 Hashtable 和 synchronizedMap 中,获得 Map 的锁能防止其他线程访问这个 Map。大多数情况下,用 ConcurrentHashMap 来代替同步 Map 能进一步提高代码的可伸缩性。
2. 额外的原子Map操作
由于 ConcurrentHashMap 不能被加锁来执行独占访问,因此无法使用客户端加锁来创造新的原子操作。 不过像 “若没有则添加”、“若相等则移除” 和 “若相等则替换” 等,都已经实现为原子操作并且在 ConcurrentMap 的接口中声明,如下代码所示:
public intercace ConcurrentHashMap<K, V> extends Map<K, V>
// 仅当 K 没有相应的映射值时才插入
V putIfAbsent(K key, V value);
// 仅当 K 被映射到 V 才移除
boolean remove(K key, V value);
// 仅当 K 被映射到 oldValue 时才替换为 newValue
boolean replace(K key, V oldValue, V newValue);
// 仅当K 被映射到某个值时才替换为 newValue
V replace(K key, V newValue);
如果你需要在现有的同步 Map 中添加如上的操作,那么也就意味着应该考虑使用 ConcurrentMap 了。
3. CopyOnWriteArrayList
CopyOnWriteArrayList 用于替代同步 List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。
类似地,CopyOnWriteArraySet 用于替代同步Set。
“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。“写入时复制” 容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。
显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。仅当迭代操作远远多于修改操作时,才应该使用 “写入时复制” 容器。
许多事件通知系统中,在分发通知时需要迭代已注册监听器链表,并调用每一个监听器,在大多数情况下,注册和注销事件监听器的操作远少于接收事件通知的操作。
4. 阻塞队列
这块的篇幅较多,下一篇博文将会详细介绍,尽情期待!
以上是关于并发编程从零开始-同步工具类的主要内容,如果未能解决你的问题,请参考以下文章