工作三年该懂的Java并发编程(干货)
Posted 里奥ii
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了工作三年该懂的Java并发编程(干货)相关的知识,希望对你有一定的参考价值。
本文的组织形式如下,主要会介绍到同步容器类,操作系统的并发工具,Java 开发工具包(只是简单介绍一下,后面会有源码分析)。同步工具类有哪些。
下面我们就来介绍一下 Java 并发中都涉及哪些模块,这些并发模块都是 Java 并发类库所提供的。
同步容器类
同步容器主要包括两类,一种是本来就是线程安全实现的容器,这类容器有 Vector、Hashtable、Stack,这类容器的方法上都加了 synchronized
锁,是线程安全的实现。
“Vector、Hashtable、Stack 这些容器我们现在几乎都不在使用,因为这些容器在多线程环境下的效率不高。
还有一类是由 Collections.synchronizedxxx
实现的非线程安全的容器,使用 Collections.synchronized 会把它们封装起来编程线程安全的容器,举出两个例子
- Collections.synchronizedList
- Collections.synchronizedMap
我们可以通过 Collections 源码可以看出这些线程安全的实现
要不为啥要称 Collections 为集合工具类呢?Collections 会把这些容器类的状态封装起来,并对每个同步方法进行同步,使得每次只有一个线程能够访问容器的状态。
其中每个 synchronized xxx
都是相当于创建了一个静态内部类。
虽然同步容器类都是线程安全的,但是在某些情况下需要额外的客户端加锁来保证一些复合操作的安全性,复合操作就是有两个及以上的方法组成的操作,比如最典型的就是 若没有则添加
,用伪代码表示则是
if(a == null)
a = get();
比如可以用来判断 Map 中是否有某个 key,如果没有则添加进 Map 中。这些复合操作在没有客户端加锁的情况下是线程安全的,但是当多个线程并发修改容器时,可能会表现出意料之外的行为。例如下面这段代码
public class TestVector implements Runnable
static Vector vector = new Vector();
static void addVector()
for(int i = 0;i < 10000;i++)
vector.add(i);
static Object getVector()
int index = vector.size() - 1;
return vector.get(index);
static void removeVector()
int index = vector.size() - 1;
vector.remove(index);
@Override
public void run()
getVector();
public static void main(String[] args)
TestVector testVector = new TestVector();
testVector.addVector();
Thread t1 = new Thread(() ->
for(int i = 0;i < vector.size();i++)
getVector();
);
Thread t2 = new Thread(() ->
for(int i = 0;i < vector.size();i++)
removeVector();
);
t1.start();
t2.start();
这些方法看似没有问题,因为 Vector 能够保证线程安全性,无论多少个线程访问 Vector 也不会造成 Vector 的内部产生破坏,但是从整个系统来说,是存在线程安全性的,事实上你运行一下,也会发现报错。
会出现
如果线程 A 在包含这么多元素的基础上调用 getVector
方法,会得到一个数值,getVector 只是取得该元素,而并不是从 vector 中移除,removeVector
方法是得到一个元素进行移除,这段代码的不安全因素就是,因为线程的时间片是乱序的,而且 getVector 和 removeVector 并不会保证互斥,所以在 removeVector 方法把某个值比如 6666 移除后,vector 中就不存在这个 6666 的元素,此时 getVector 方法取得 6666 ,就会抛出数组越界异常。为什么是数组越界异常呢?可以看一下 vector 的源码
如果用图表示的话,则会是下面这样。
所以,从系统的层面来看,上面这段代码也要保证线程安全性才可以,也就是在客户端加锁
实现,只要我们让复合操作使用一把锁,那么这些操作就和其他单独的操作一样都是原子性的。如下面例子所示
static Object getVector()
synchronized (vector)
int index = vector.size() - 1;
return vector.get(index);
static void removeVector()
synchronized (vector)
int index = vector.size() - 1;
vector.remove(index);
也可以通过锁住 .class
来保证原子性操作,也能达到同样的效果。
static Object getVector()
synchronized (TestVector.class)
int index = vector.size() - 1;
return vector.get(index);
static void removeVector()
synchronized (TestVector.class)
int index = vector.size() - 1;
vector.remove(index);
在调用 size 和 get 之间,Vector 的长度可能会发生变化,这种变化在对 Vector 进行排序时出现,如下所示
for(int i = 0;i< vector.size();i++)
doSomething(vector.get(i));
这种迭代的操作正确性取决于运气,即在调用 size 和 get 之间会修改 Vector,在单线程环境中,这种假设完全成立,但是再有其他线程并发修改 Vector 时,则可能会导致麻烦。
我们仍旧可以通过客户端加锁的方式来避免这种情况
synchronized(vector)
for(int i = 0;i< vector.size();i++)
doSomething(vector.get(i));
这种方式为客户端的可靠性提供了保证,但是牺牲了伸缩性,而且这种在遍历过程中进行加锁,也不是我们所希望看到的。
fail-fast
针对上面这种情况,很多集合类都提供了一种 fail-fast
机制,因为大部分集合内部都是使用 Iterator 进行遍历,在循环中使用同步锁的开销会很大,而 Iterator 的创建是轻量级的,所以在集合内部如果有并发修改的操作,集合会进行快速失败
,也就是 fail-fast
。当他们发现容器在迭代过程中被修改时,会抛出 ConcurrentModificationException
异常,这种快速失败不是一种完备的处理机制,而只是 善意
的捕获并发错误。
如果查看过 ConcurrentModificationException 的注解,你会发现,ConcurrentModificationException 抛出的原则由两种,如下
造成这种异常的原因是由于多个线程在遍历集合的同时对集合类内部进行了修改,这也就是 fail-fast 机制。
该注解还声明了另外一种方式
这个问题也是很经典的一个问题,我们使用 ArrayList 来举例子。如下代码所示
public static void main(String[] args)
List<String> list = new ArrayList<>();
for (int i = 0 ; i < 10 ; i++ )
list.add(i + "");
Iterator<String> iterator = list.iterator();
int i = 0 ;
while(iterator.hasNext())
if (i == 3)
list.remove(3);
System.out.println(iterator.next());
i ++;
该段代码会发生异常,因为在 ArrayList 内部,有两个属性,一个是 modCount
,一个是 expectedModCount
,ArrayList 在 remove 等对集合结构的元素造成数量上的操作会有 checkForComodification
的判断,如下所示,这也是这段代码的错误原因。
fail-safe
fail-safe
是 Java 中的一种 安全失败
机制,它表示的是在遍历时不是直接在原集合上进行访问,而是先复制原有集合内容,在拷贝的集合上进行遍历。由于迭代时是对原集合的拷贝进行遍历,所以在遍历过程中对原集合所作的修改并不能被迭代器检测到,所以不会触发 ConcurrentModificationException。java.util.concurrent
包下的容器都是安全失败的,可以在多线程条件下使用,并发修改。
比如 CopyOnWriteArrayList
, 它就是一种 fail-safe 机制的集合,它就不会出现异常,例如如下操作
List<Integer> integers = new CopyOnWriteArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
Iterator<Integer> itr = integers.iterator();
while (itr.hasNext())
Integer a = itr.next();
integers.remove(a);
CopyOnWriteArrayList 就是 ArrayList 的一种线程安全的变体,CopyOnWriteArrayList 中的所有可变操作比如 add 和 set 等等都是通过对数组进行全新复制来实现的。
操作系统中的并发工具
讲到并发容器,就不得不提操作系统级别实现了哪些进程/线程间的并发容器,说白了其实就是数据结构的设计。下面我们就来一起看一下操作系统级别的并发工具
信号量
信号量是 E.W.Dijkstra 在 1965 年提出的一种方法,它使用一个整形变量来累计唤醒次数,以供之后使用。在他的观点中,有一个新的变量类型称作 信号量(semaphore)
。一个信号量的取值可以是 0 ,或任意正数。0 表示的是不需要任何唤醒,任意的正数表示的就是唤醒次数。
Dijkstra 提出了信号量有两个操作,现在通常使用 down
和 up
(分别可以用 sleep 和 wakeup 来表示)。down 这个指令的操作会检查值是否大于 0 。如果大于 0 ,则将其值减 1 ;若该值为 0 ,则进程将睡眠,而且此时 down 操作将会继续执行。检查数值、修改变量值以及可能发生的睡眠操作均为一个单一的、不可分割的 原子操作(atomic action)
完成。
互斥量
如果不需要信号量的计数能力时,可以使用信号量的一个简单版本,称为mutex(互斥量)
。互斥量的优势就在于在一些共享资源和一段代码中保持互斥。由于互斥的实现既简单又有效,这使得互斥量在实现用户空间线程包时非常有用。
互斥量是一个处于两种状态之一的共享变量:解锁(unlocked)
和 加锁(locked)
。这样,只需要一个二进制位来表示它,不过一般情况下,通常会用一个 整型(integer)
来表示。0 表示解锁,其他所有的值表示加锁,比 1 大的值表示加锁的次数。
mutex 使用两个过程,当一个线程(或者进程)需要访问关键区域时,会调用 mutex_lock
进行加锁。如果互斥锁当前处于解锁状态(表示关键区域可用),则调用成功,并且调用线程可以自由进入关键区域。
另一方面,如果 mutex 互斥量已经锁定的话,调用线程会阻塞直到关键区域内的线程执行完毕并且调用了 mutex_unlock
。如果多个线程在 mutex 互斥量上阻塞,将随机选择一个线程并允许它获得锁。
Futexes
随着并行的增加,有效的同步(synchronization)
和锁定(locking)
对于性能来说是非常重要的。如果进程等待时间很短,那么自旋锁(Spin lock)
是非常有效;但是如果等待时间比较长,那么这会浪费 CPU 周期。如果进程很多,那么阻塞此进程,并仅当锁被释放的时候让内核解除阻塞是更有效的方式。不幸的是,这种方式也会导致另外的问题:它可以在进程竞争频繁的时候运行良好,但是在竞争不是很激烈的情况下内核切换的消耗会非常大,而且更困难的是,预测锁的竞争数量更不容易。
有一种有趣的解决方案是把两者的优点结合起来,提出一种新的思想,称为 futex
,或者是 快速用户空间互斥(fast user space mutex)
,是不是听起来很有意思?
futex 是 Linux
中的特性实现了基本的锁定(很像是互斥锁)而且避免了陷入内核中,因为内核的切换的开销非常大,这样做可以大大提高性能。futex 由两部分组成:内核服务和用户库。内核服务提供了了一个 等待队列(wait queue)
允许多个进程在锁上排队等待。除非内核明确的对他们解除阻塞,否则它们不会运行。
Pthreads 中的互斥量
Pthreads 提供了一些功能用来同步线程。最基本的机制是使用互斥量变量,可以锁定和解锁,用来保护每个关键区域。希望进入关键区域的线程首先要尝试获取 mutex。如果 mutex 没有加锁,线程能够马上进入并且互斥量能够自动锁定,从而阻止其他线程进入。如果 mutex 已经加锁,调用线程会阻塞,直到 mutex 解锁。如果多个线程在相同的互斥量上等待,当互斥量解锁时,只有一个线程能够进入并且重新加锁。这些锁并不是必须的,程序员需要正确使用它们。
下面是与互斥量有关的函数调用
和我们想象中的一样,mutex 能够被创建和销毁,扮演这两个角色的分别是 Phread_mutex_init
和 Pthread_mutex_destroy
。mutex 也可以通过 Pthread_mutex_lock
来进行加锁,如果互斥量已经加锁,则会阻塞调用者。还有一个调用Pthread_mutex_trylock
用来尝试对线程加锁,当 mutex 已经被加锁时,会返回一个错误代码而不是阻塞调用者。这个调用允许线程有效的进行忙等。最后,Pthread_mutex_unlock
会对 mutex 解锁并且释放一个正在等待的线程。
除了互斥量以外,Pthreads
还提供了第二种同步机制:条件变量(condition variables)
。mutex 可以很好的允许或阻止对关键区域的访问。条件变量允许线程由于未满足某些条件而阻塞。绝大多数情况下这两种方法是一起使用的。下面我们进一步来研究线程、互斥量、条件变量之间的关联。
下面再来重新认识一下生产者和消费者问题:一个线程将东西放在一个缓冲区内,由另一个线程将它们取出。如果生产者发现缓冲区没有空槽可以使用了,生产者线程会阻塞起来直到有一个线程可以使用。生产者使用 mutex 来进行原子性检查从而不受其他线程干扰。但是当发现缓冲区已经满了以后,生产者需要一种方法来阻塞自己并在以后被唤醒。这便是条件变量做的工作。
下面是一些与条件变量有关的最重要的 pthread 调用
上表中给出了一些调用用来创建和销毁条件变量。条件变量上的主要属性是 Pthread_cond_wait
和 Pthread_cond_signal
。前者阻塞调用线程,直到其他线程发出信号为止(使用后者调用)。阻塞的线程通常需要等待唤醒的信号以此来释放资源或者执行某些其他活动。只有这样阻塞的线程才能继续工作。条件变量允许等待与阻塞原子性的进程。Pthread_cond_broadcast
用来唤醒多个阻塞的、需要等待信号唤醒的线程。
“需要注意的是,条件变量(不像是信号量)不会存在于内存中。如果将一个信号量传递给一个没有线程等待的条件变量,那么这个信号就会丢失,这个需要注意
管程
为了能够编写更加准确无误的程序,Brinch Hansen 和 Hoare 提出了一个更高级的同步原语叫做 管程(monitor)
。管程有一个很重要的特性,即在任何时候管程中只能有一个活跃的进程,这一特性使管程能够很方便的实现互斥操作。管程是编程语言的特性,所以编译器知道它们的特殊性,因此可以采用与其他过程调用不同的方法来处理对管程的调用。通常情况下,当进程调用管程中的程序时,该程序的前几条指令会检查管程中是否有其他活跃的进程。如果有的话,调用进程将被挂起,直到另一个进程离开管程才将其唤醒。如果没有活跃进程在使用管程,那么该调用进程才可以进入。
进入管程中的互斥由编译器负责,但是一种通用做法是使用 互斥量(mutex)
和 二进制信号量(binary semaphore)
。由于编译器而不是程序员在操作,因此出错的几率会大大降低。在任何时候,编写管程的程序员都无需关心编译器是如何处理的。他只需要知道将所有的临界区转换成为管程过程即可。绝不会有两个进程同时执行临界区中的代码。
即使管程提供了一种简单的方式来实现互斥,但在我们看来,这还不够。因为我们还需要一种在进程无法执行被阻塞。在生产者-消费者问题中,很容易将针对缓冲区满和缓冲区空的测试放在管程程序中,但是生产者在发现缓冲区满的时候该如何阻塞呢?
解决的办法是引入条件变量(condition variables)
以及相关的两个操作 wait
和 signal
。当一个管程程序发现它不能运行时(例如,生产者发现缓冲区已满),它会在某个条件变量(如 full)上执行 wait
操作。这个操作造成调用进程阻塞,并且还将另一个以前等在管程之外的进程调入管程。在前面的 pthread 中我们已经探讨过条件变量的实现细节了。另一个进程,比如消费者可以通过执行 signal
来唤醒阻塞的调用进程。
通过临界区自动的互斥,管程比信号量更容易保证并行编程的正确性。但是管程也有缺点,我们前面说到过管程是一个编程语言的概念,编译器必须要识别管程并用某种方式对其互斥作出保证。C、Pascal 以及大多数其他编程语言都没有管程,所以不能依靠编译器来遵守互斥规则。
与管程和信号量有关的另一个问题是,这些机制都是设计用来解决访问共享内存的一个或多个 CPU 上的互斥问题的。通过将信号量放在共享内存中并用 TSL
或 XCHG
指令来保护它们,可以避免竞争。但是如果是在分布式系统中,可能同时具有多个 CPU 的情况,并且每个 CPU 都有自己的私有内存呢,它们通过网络相连,那么这些原语将会失效。因为信号量太低级了,而管程在少数几种编程语言之外无法使用,所以还需要其他方法。
消息传递
上面提到的其他方法就是 消息传递(messaage passing)
。这种进程间通信的方法使用两个原语 send
和 receive
,它们像信号量而不像管程,是系统调用而不是语言级别。示例如下
send(destination, &message);
receive(source, &message);
send 方法用于向一个给定的目标发送一条消息,receive 从一个给定的源接收一条消息。如果没有消息,接受者可能被阻塞,直到接收一条消息或者带着错误码返回。
消息传递系统现在面临着许多信号量和管程所未涉及的问题和设计难点,尤其对那些在网络中不同机器上的通信状况。例如,消息有可能被网络丢失。为了防止消息丢失,发送方和接收方可以达成一致:一旦接受到消息后,接收方马上回送一条特殊的 确认(acknowledgement)
消息。如果发送方在一段时间间隔内未收到确认,则重发消息。
现在考虑消息本身被正确接收,而返回给发送着的确认消息丢失的情况。发送者将重发消息,这样接受者将收到两次相同的消息。
对于接收者来说,如何区分新的消息和一条重发的老消息是非常重要的。通常采用在每条原始消息中嵌入一个连续的序号来解决此问题。如果接受者收到一条消息,它具有与前面某一条消息一样的序号,就知道这条消息是重复的,可以忽略。
消息系统还必须处理如何命名进程的问题,以便在发送或接收调用中清晰的指明进程。身份验证(authentication)
也是一个问题,比如客户端怎么知道它是在与一个真正的文件服务器通信,从发送方到接收方的信息有可能被中间人所篡改。
屏障
最后一个同步机制是准备用于进程组而不是进程间的生产者-消费者情况的。在某些应用中划分了若干阶段,并且规定,除非所有的进程都就绪准备着手下一个阶段,否则任何进程都不能进入下一个阶段,可以通过在每个阶段的结尾安装一个 屏障(barrier)
来实现这种行为。当一个进程到达屏障时,它会被屏障所拦截,直到所有的屏障都到达为止。屏障可用于一组进程同步,如下图所示
在上图中我们可以看到,有四个进程接近屏障,这意味着每个进程都在进行运算,但是还没有到达每个阶段的结尾。过了一段时间后,A、B、D 三个进程都到达了屏障,各自的进程被挂起,但此时还不能进入下一个阶段呢,因为进程 B 还没有执行完毕。结果,当最后一个 C 到达屏障后,这个进程组才能够进入下一个阶段。
避免锁:读-复制-更新
最快的锁是根本没有锁。问题在于没有锁的情况下,我们是否允许对共享数据结构的并发读写进行访问。答案当然是不可以。假设进程 A 正在对一个数字数组进行排序,而进程 B 正在计算其平均值,而此时你进行 A 的移动,会导致 B 会多次读到重复值,而某些值根本没有遇到过。
然而,在某些情况下,我们可以允许写操作来更新数据结构,即便还有其他的进程正在使用。窍门在于确保每个读操作要么读取旧的版本,要么读取新的版本,例如下面的树
上面的树中,读操作从根部到叶子遍历整个树。加入一个新节点 X 后,为了实现这一操作,我们要让这个节点在树中可见之前使它"恰好正确":我们对节点 X 中的所有值进行初始化,包括它的子节点指针。然后通过原子写操作,使 X 称为 A 的子节点。所有的读操作都不会读到前后不一致的版本
在上面的图中,我们接着移除 B 和 D。首先,将 A 的左子节点指针指向 C 。所有原本在 A 中的读操作将会后续读到节点 C ,而永远不会读到 B 和 D。也就是说,它们将只会读取到新版数据。同样,所有当前在 B 和 D 中的读操作将继续按照原始的数据结构指针并且读取旧版数据。所有操作均能正确运行,我们不需要锁住任何东西。而不需要锁住数据就能够移除 B 和 D 的主要原因就是 读-复制-更新(Ready-Copy-Update,RCU)
,将更新过程中的移除和再分配过程分离开。
Java 并发工具包
JDK 1.5 提供了许多种并发容器来改进同步容器的性能,同步容器将所有对容器状态的访问都串行化,以实现他们之间的线程安全性。这种方法的代价是严重降低了并发性能,当多个线程争抢容器锁的同时,严重降低吞吐量。
下面我们就来一起认识一下 Java 中都用了哪些并发工具
Java 并发工具综述
在 Java 5.0 中新增加了 ConcurrentHashMap
用来替代基于散列的 Map 容器;新增加了 CopyOnWriteArrayList
和 CopyOnWriteArraySet
来分别替代 ArrayList 和 Set 接口实现类;还新增加了两种容器类型,分别是 Queue
和 BlockingQueue
, Queue 是队列的意思,它有一些实现分别是传统的先进先出队列 ConcurrentLinkedQueue
以及并发优先级队列 PriorityQueue
。Queue 是一个先入先出的队列,它的操作不会阻塞,如果队列为空那么获取元素的操作会返回空值。PriorityQueue 扩展了 Queue,增加了可阻塞的插入和获取等操作。如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素为止。如果队列已满,那么插入操作则一直阻塞,直到队列中有可用的空间为止。
Java 6.0 还引入了 ConcurrentSkipListMap
和 ConcurrentSkipListSet
分别作为同步的 SortedMap 和 SortedSet 的并发替代品。下面我们就展开探讨了,设计不到底层源码,因为本篇文章主要目的就是为了描述一下有哪些东西以及用了哪些东西。
ConcurrentHashMap
我们先来看一下 ConcurrentHashMap 在并发集合中的位置
可以看到,ConcurrentHashMap 继承了 AbstractMap
接口并实现了 ConcurrentMap 和 Serializable 接口,AbstractMap 和 ConcurrentMap 都是 Map 的实现类,只不过 AbstractMap 是抽象实现。
ConcurrentHashMap 和 Hashtable 的构造非常相似,只不过 Hashtable 容器在激烈竞争的场景中会表现出效率低下的现象,这是因为所有访问 Hashtable 的线程都想获取同一把锁,如果容器里面有多把锁,并且每一把锁都只用来锁定一段数据,那么当多个线程访问不同的数据段时,就不存在竞争关系。这就是 ConcurreentHashMap 采用的 分段锁
实现。在这种锁实现中,任意数量的读取线程可以并发的访问 Map,执行读取操作的线程和执行写入的线程可以并发的访问 Map,并且在读取的同时也可以并发修改 Map。
ConcurrentHashMap 分段锁实现带来的结果是,在并发环境下可以实现更高的吞吐量,在单线程环境下只损失非常小的性能。
你知道 HashMap 是具有 fail-fast 机制的,也就是说它是一种强一致性的集合,在数据不一致的情况下会抛出 ConcurrentModificationException
异常,而 ConcurrentHashMap 是一种 弱一致性
的集合,在并发修改其内部结构时,它不会抛出 ConcurrentModificationException 异常,弱一致性能够容忍并发修改。
在 HashMap 中,我们一般使用的 size、empty、containsKey 等方法都是标准方法,其返回的结果是一定的,包含就是包含,不包含就是不包含,可以作为判断条件;而 ConcurrentHashMap 中的这些方法只是参考方法,它不是一个 精确值
,像是 size、empty 这些方法在并发场景下用处很小,因为他们的返回值总是在不断变化,所以这些操作的需求就被弱化了。
在 ConcurrentHashMap 中没有实现对 Map 加锁从而实现独占访问。在线程安全的 Map 实现 Hashtable
和 Collections.synchronizedMap
中都实现了独占访问,因此只能单个线程修改 Map 。ConcurrentHashMap 与这些 Map 容器相比,具有更多的优势和更少的劣势,只有当需要独占访问的需求时才会使用 Hashtable 或者是 Collections.synchronizedMap ,否则其他并发场景下,应该使用 ConcurrentHashMap。
ConcurrentMap
ConcurrentMap 是一个接口,它继承了 Map 接口并提供了 Map 接口中四个新的方法,这四个方法都是 原子性
方法,进一步扩展了 Map 的功能。
public interface ConcurrentMap<K, V> extends Map<K, V>
// 仅当 key 没有相应的映射值时才插入
V putIfAbsent(K以上是关于工作三年该懂的Java并发编程(干货)的主要内容,如果未能解决你的问题,请参考以下文章