一、同步容器
同步容器包括两类:
- Vector、Hashtable、Stack
- 同步的封装器类由 Collections.synchronizedXXX 等工厂方法创建的。(JDK1.2加入)
??这些类实现线程安全的方式是:将他们的状态封装起来,并对每个公有方法都进行同步,使得每一次只有一个线程能访问容器的状态。 同步容器类的出现是为了解决 Collection、Map 不能同步,线程不安全的问题。
同步容器类的问题
??同步容器类都是线程安全的,但不是绝对的线程安全 (所谓线程安全仅仅是在每一个方法上加锁,保持原子)。在某些情况下,需要额外加锁来保护复合操作。复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素)、跳转(根据指定的顺序找到当前元素的下一个元素)、以及条件运算(如“若没有则添加”)。这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为。
看下面三种“意外”情况:
1. 获取与删除的复合操作
??下面的代码看起来没什么问题,但如果一旦出现:线程A执行 getLast() 方法,线程B执行 deleteLast() 方法;线程A,线程B 交替执行,getLast() 方法就可能会抛出 ArrayIndexOutOfBoundsException(数组下标越界)。
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
??为防止这种情况出现,就要额外加锁,使 getLast()、deleteLast() 方法成为原子性操作。正确的写法如下:
public static Object getLast(Vector list){
synchronized(this){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list){
synchronized(this){
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
2. 普通迭代
for(int i = 0 ;i < vector.size(); i++){
doSome(vector.get(i));
}
??这种迭代方法的正确性完全依赖于运气:我们无法保证在调用size与get直接按有没有其他线程对所操作的这个Vector进行了修改。但是这并不代表Vector就不是线程安全的。Vector仍然是线程安全的,而抛出的异常也与其规范保持一致。和 getLast()的例子一样,如果 遍历列表的线程 与 删除的线程 交替执行,同样也会抛出 ArrayIndexOutOfBoundsException。
改进的写法:
synchronized(vector){
for(int i = 0 ;i < vector.size(); i++){
doSome(vector.get(i));
}
}
3. 迭代器 Iterator 与 ConcurrentModificationException
??对容器的标准迭代方式是使用 Iterator。然而,在迭代的期间,如果有线程并发地修改同步容器的,那么即使使用 Iterator 也无法避免对容器进行加锁。这是由于早期迭代器设计的时候并没有考虑并发修改的问题。而且,一旦失败,将会抛出 ConcurrentModificationException .
Collection c = Collections.synchronizedCollection(myCollection);
...
synchronized(c) {
Iterator i = c.iterator(); // Must be in the synchronized block
while (i.hasNext())
foo(i.next());
}
??有时候程序员并不希望在迭代期间对容器加锁。特别是容器迭代的规模大的时候,就可能需要长时间加锁,会造成锁的竞争激烈,降低程序的伸缩性。替代的方法是,克隆出一个副本,在副本上迭代。但也要进行权衡,因为克隆复制也需要额外的开销。
隐藏的迭代器
??容器的有些方法是进行迭代的,这些方法也要记得对其加锁。我们看看下面这个程序:
public class HiddenIterator{
@GuardedBy(this)
private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i ){ set.add(i); }
public synchronized void remove(Integer i ){ set.remove(i); }
public void addTenThings(){
Random r = new Random();
for(int i = 0 ; i < 10; i++)
add(r.nextInt());
System.out.println("DEBUG : added ten elements to" + set);
}
}
??上面的程序看起来好像也没问题,add、remove都加锁了。然而 addTenThings() 的最一行输出中,调用了 Set.toString()方法,toString()方法又是对容器进行了迭代,也可能抛出 ConcurrentModificationException 。所以,也要对 addTenThings()方法加锁。
??隐藏了迭代容器操作的方法: toString()、hashcode()、equals()、containsAll()、removeAll()、retainAll()。还有 forEach 的写法。
二、并发容器类
??同步类容器的状态都是串行化的(使用 synchronized 加锁的,同一时间只能一个线程访问容器,一个个排队访问,这就是串行化)。他们虽然实现了线程安全,但是严重降低了并发性,在多线程环境时,严重降低了应用的吞吐量。
看一下源代码,更加直观:
下面是 Collections.SynchronizedCollection() 方法的源代码。
public static <T> Collection<T> synchronizedCollection(Collection<T> c) {
return new SynchronizedCollection<>(c);
}
synchronizedCollection()方法是直接创建并返回一个 SynchronizedCollection 类的对象,这个类是 Collections 的静态内部类,继续跟踪。
static class SynchronizedCollection<E> implements Collection<E>, Serializable {
private static final long serialVersionUID = 3053995032091335093L;
final Collection<E> c; // 非线程安全的 Collection
final Object mutex; // Object on which to synchronize
SynchronizedCollection(Collection<E> c) {
//判断集合c是否是为null,为null就抛异常
this.c = Objects.requireNonNull(c);
mutex = this;
}
SynchronizedCollection(Collection<E> c, Object mutex) {
//判断集合c是否是为null,为null就抛异常
this.c = Objects.requireNonNull(c);
this.mutex = Objects.requireNonNull(mutex);
}
//封装集合c的size()方法,synchronized修饰,变成同步方法
public int size() {
synchronized (mutex) {return c.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return c.isEmpty();}
}
public boolean contains(Object o) {
synchronized (mutex) {return c.contains(o);}
}
public Object[] toArray() {
synchronized (mutex) {return c.toArray();}
}
//........
??可以看出,Collections.SynchronizedCollection类其实就是封装了 非线程安全的Collection 类对象,在 Collection 的每个方法上加上 synchronized。
再看一下 Vector add()方法的源代码:
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
??从上面的源码可以得知:同步容器的线程安全都是用 synchronized 来实现的,而且锁住整个方法区,即方法区的所有代码都是临界区,这就导致了同一时刻,只能有一个线程访问容器。换句话说,只能同步地访问容器,无法并发地访问容器,在高并发的情况下,将会非常地糟糕。
这时候,高性能的并发容器出现了
??java5.0之后提供了多种并发容器来改善同步容器的性能,如 ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue;其中 ConcurrentHashMap 用来替代 Hashtable ,CopyOnWriteArrayList 用来替代 Vector;
??并发容器类采用各种优化手段,尽可能让多线程并发访问容器:ConcurrentHashMap 的分段锁、ConcurrentLinkedQueue 的非阻塞的CAS算法、锁的粒度更细、以及针对多读少写的情况下的 “写时复制”。
下面重点说一下 ConcurrentHashMap
??ConcurrentHashMap 采用分段锁技术 ,同步容器中,是一个容器一个锁,但在ConcurrentHashMap中,会将hash表的数据分成若干段,每段维护一个锁,以达到高效的并发访问;
??ConcurrentHashMap 与 其他并发容器一样,在迭代的过程不需要加锁,迭代器具有弱一致性,迭代期间不会抛出ConcurrentModificationException异常,并非“立即失败”;所谓 弱一致性 ,就是返回的元素将反映迭代器创建时或创建后某一时刻的映射状态。同时,需要在整个Map上进行计算的方法,如 size()、isEmpty(),这些方法的语义被略微减弱,以反映并发的特性,换句话说,这些方法的值是一个估计值,并不是很精确。事实上,这些方法在并发环境下用处很小,因为在并发的情况下,它们的返回值总是在变化。如果需要强一致性,那么就得考虑加锁。同步容器类便是强一致性的。
??由于 ConcurrentHashMap 不能被加锁来执行独占访问,因此无法通过加锁来创建新的原子操作。不过,ConcurrentHashMap 提供了以下几个原子操作(由其父接口 ConcurrentMap 提供),基本满足需求了:
//如果指定键已经不再与某个值相关联,则将它与给定值关联。
V putIfAbsent(K key, V value);
//只有目前将键的条目映射到给定值时,才移除该键的条目。
boolean remove(Object key, Object value);
//只有目前将键的条目映射到某一值时,才替换该键的条目。
V replace(K key, V value);
//只有目前将键的条目映射到给定值时,才替换该键的条目。
boolean replace(K key,V oldValue, V newValue);
JDK 提供的并发容器还包括以下7个阻塞队列,如下:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
补充说明:上面的 ConcurrentHashMap 的介绍是基于 JDK1.6 版本的,JDK1.8 有所修改,可参考后续文章。
参考文献:
- 《并发编程的艺术》
- 《并发编程实战》