Java并发编程实战14构建自定义同步工具(Building-Custom-Synchronizers)
Posted JavaEdge.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程实战14构建自定义同步工具(Building-Custom-Synchronizers)相关的知识,希望对你有一定的参考价值。
JDK包含许多存在状态依赖的类,例如FutureTask、Semaphore和BlockingQueue,他们的一些操作都有前提条件,例如非空、任务已完成等。
创建状态依赖类的最简单的房就是在JDK提供了的状态依赖类基础上构造。例如ValueLactch,如果这些不满足,可以使用Java语言或者类库提供的底层机制来构造,包括
- 内置的条件队列
- condition
- AQS
这一章就介绍这些。
1 状态依赖性的管理 State Dependence
下一节会介绍使用条件队列解决阻塞线程运行。
下面先介绍通过轮询和休眠的方式(勉强)的解决。
标准模板:
void blockingAction() throws InterruptedException {
acquire lock on object state
while (precondition does not hold) {
release lock
wait until precondition might hold
optionally fail if interrupted or timeout expires
reacquire lock
}
perform action
}
看看阻塞有界队列的几种实现方式。依赖的前提条件:
- 不能从空缓存中获取元素
- 不能将元素放入已满的缓存中
不满足条件候,依赖状态的操作可以:
- 抛出异常
- 返回一个错误状态(码)
- 阻塞直到进入正确的状态
下面是基类,线程安全,但非阻塞。
@ThreadSafe
public abstract class BaseBoundedBuffer <V> {
@GuardedBy("this") private final V[] buf;
@GuardedBy("this") private int tail;
@GuardedBy("this") private int head;
@GuardedBy("this") private int count;
protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v) {
buf[tail] = v;
if (++tail == buf.length)
tail = 0;
++count;
}
protected synchronized final V doTake() {
V v = buf[head];
buf[head] = null;
if (++head == buf.length)
head = 0;
--count;
return v;
}
public synchronized final boolean isFull() {
return count == buf.length;
}
public synchronized final boolean isEmpty() {
return count == 0;
}
}
“先检查再运行”的逻辑解决方案:
调用者必须自己处理前提条件失败的情况。当然也可以返回错误消息。
当然调用者可以不Sleep,而是直接重试,这就是忙等待或者自旋等待(busy waiting or spin waiting),如果换成很长时间都不变,那么这将会消耗大量的CPU时间!!!所以调用者自己休眠,sleep让出CPU。但这个时间就很尴尬:
- sleep长了万一一会前提条件就满足了岂不是白等了从而响应性低
- sleep短了浪费CPU时钟周期
另外可以试试yield,但是这也不靠谱。
@ThreadSafe
public class GrumpyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
public GrumpyBoundedBuffer() {
this(100);
}
public GrumpyBoundedBuffer(int size) {
super(size);
}
public synchronized void put(V v) throws BufferFullException {
if (isFull())
throw new BufferFullException();
doPut(v);
}
public synchronized V take() throws BufferEmptyException {
if (isEmpty())
throw new BufferEmptyException();
return doTake();
}
}
class ExampleUsage {
private GrumpyBoundedBuffer<String> buffer;
int SLEEP_GRANULARITY = 50;
void useBuffer() throws InterruptedException {
while (true) {
try {
String item = buffer.take();
// use item
break;
} catch (BufferEmptyException e) {
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
}
优化让客户端舒服些:
@ThreadSafe
public class SleepyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
int SLEEP_GRANULARITY = 60;
public SleepyBoundedBuffer() {
this(100);
}
public SleepyBoundedBuffer(int size) {
super(size);
}
public void put(V v) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(v);
return;
}
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
public V take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty())
return doTake();
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
这种方式测试失败,那么释放锁,让别人做,自己休眠下,然后再检测,不断的重复这个过程,当然可以解决,但还需要做权衡,CPU使用率与响应性之间的抉择。
那么我们想如果这种轮询和休眠的dummy方式不用,而是存在某种挂起线程的方案,并且这种方法能够确保当某个条件为 true 时,立刻唤醒线程,那将极大简化实现工作,这就是条件队列的实现。
Condition Queues的名字来源:
it gives a group of threads called the wait set a way to wait for a specific condition to become true. Unlike
typical queues in which the elements are data items, the elements of a
condition queue are the threads waiting for the condition.
每个Java对象都可以是一个锁,每个对象同样可以作为一个条件队列,并且Object的wait、notify和notifyAll就是内部条件队列的API。对象的内置锁(intrinsic lock )和内置条件队列是关联的,要调用X中的条件队列的任何一个方法,都必须持有对象X上的锁。
Object.wait
自动释放锁,并且请求os挂起当前线程,其他线程可以获得这个锁并修改对象状态。当被挂起的线程唤醒时,它将在返回之前重新获取锁。
@ThreadSafe
public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
// CONDITION PREDICATE: not-full (!isFull())
// CONDITION PREDICATE: not-empty (!isEmpty())
public BoundedBuffer() {
this(100);
}
public BoundedBuffer(int size) {
super(size);
}
// BLOCKS-UNTIL: not-full
public synchronized void put(V v) throws InterruptedException {
while (isFull())
wait();
doPut(v);
notifyAll();
}
// BLOCKS-UNTIL: not-empty
public synchronized V take() throws InterruptedException {
while (isEmpty())
wait();
V v = doTake();
notifyAll();
return v;
}
// BLOCKS-UNTIL: not-full
// Alternate form of put() using conditional notification
public synchronized void alternatePut(V v) throws InterruptedException {
while (isFull())
wait();
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty)
notifyAll();
}
}
注意,如果某个功能无法通过“轮询和休眠"来实现,那么条件队列也无法实现。
2 使用条件队列
2.1 条件谓词(The Condition Predicate)
条件谓词是使某个操作成为状态依赖操作的前提条件:
- take方法的条件谓词是”缓存不为空“,take方法在执行之前必须首先测试条件谓词
- put方法的条件谓词是”缓存不满“
在条件等待中存在一种重要的三元关系:
- 加锁
- wait方法
- 条件谓词
条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象和条件队列对象必须是同一个对象。
wait释放锁,线程挂起阻塞,等待直到超时,然后被另外一个线程中断或被通知唤醒。唤醒后,wait在返回前还需要重新获取锁,当线程从wait方法中唤醒,它在重新请求锁时不具有任何特殊的优先级,和其他人一起竞争。
2.2 过早唤醒
其他线程中间插足了,获取了锁,并且修改了遍历,这时候线程获取锁需要重新检查条件谓词。
wait block ----------race to get lock ------------------------------------------get lock -----
^
wait block --------> race to get lock ------get lock------> perform action ---> release lock
^
notifyAll
当然有时,比如一个你根本不知道为什么别人调用了notify或notifyAll,也许条件谓词压根就没满足,但线程还是获取了锁,然后test条件谓词,释放锁,其他线程都来了这么一趟,发生这就是“谎报军情”啊。
基于以上这两种情况,都必须重新测试条件谓词。
When using condition waits (Object.wait or Condition.await):
- Always have a condition predicate——some test of object state that must hold before proceeding;
- Always test the condition predicate before calling wait, and again after returning from wait;
- Always call wait in a loop;
- Ensure that the state variables making up the condition predicate are guarded by the lock associated with the condition queue;
- Hold the lock associated with the the condition queue when calling wait, notify, or notifyAll
- Do not release the lock after checking the condition predicate but before acting on it.
模板就是:
void stateDependentMethod() throws InterruptedException {
// condition predicate must be guarded by lock
synchronized(lock) {
while (!conditionPredicate()) //一定在循环里面做条件谓词
lock.wait(); //确保和synchronized的是一个对象
// object is now in desired state //不要释放锁
}
}
2.3 丢失的信号
保证notify一定在wait之后
2.4 通知
调用notify和notifyAll也得持有与条件队列对象相关联的锁。调用notify,JVM Thread Scheduler在这个条件队列上等待的多个线程中选择一个唤醒,而notifyAll则会唤醒所有线程。因此一旦notify了那么就需要尽快的释放锁,否则别人都竞争等着拿锁,都会进行blocked的状态,而不是线程挂起waiting状态,竞争都了不是好事,但是这是你考了性能因素和安全性因素的一个矛盾,具体问题要具体分析。
下面的方法可以进来减少竞争,但实现有些难写,所以还得折中考虑:
public synchronized void alternatePut(V v) throws InterruptedException {
while (isFull())
wait();
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty)
notifyAll();
}
使用notify容易丢失信号,所以大多数情况下用notifyAll,比如take notify,却通知了另外一个take,没有通知put,那么这就是信号丢失,是一种“被劫持的”信号。
因此只有满足下面两个条件,才能用notify,而不是notifyAll:
- 所有等待线程的类型都相同
- 单进单出
2.5 示例:阀门类A Gate Class
和第5章的那个TestHarness中使用CountDownLatch类似,完全可以使用wait/notifyAll做阀门。
@ThreadSafe
public class ThreadGate {
// CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
@GuardedBy("this") private boolean isOpen;
@GuardedBy("this") private int generation;
public synchronized void close() {
isOpen = false;
}
public synchronized void open() {
++generation;
isOpen = true;
notifyAll();
}
// BLOCKS-UNTIL: opened-since(generation on entry)
public synchronized void await() throws InterruptedException {
int arrivalGeneration = generation;
while (!isOpen && arrivalGeneration == generation)
wait();
}
}
3 Explicit Condition Objects
Lock是一个内置锁的替代,而Condition也是一种广义的内置条件队列。
Condition的API:
public interface Condition {
void await() throws InterruptedException;
boolean await(long time, TimeUnit unit)throws InterruptedException;
long awaitNanos(long nanosTimeout) throws InterruptedException;
void awaitUninterruptibly();
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
内置条件队列存在一些缺陷,每个内置锁都只能有一个
相关联的条件队列。所以在BoundedBuffer这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,所以notifyAll经常通知不是同一个类型的需求。如果想编写一个带有多个条件谓词的并发对象,或想获得除了条件队列可见性之外的更多的控制权,可以使用Lock和Condition,而非内置锁和条件队列,这更加灵活。
一个Condition和一个lock关联,想象一个条件队列和内置锁关联一样。在Lock上调用newCondition就可以新建无数个条件谓词,这些condition是可中断的、可有时间限制的,公平的或者非公平的队列操作。
The equivalents of wait, notify, and notifyAll for Condition objects are await, signal, and signalAll。
下面的例子就是改造后的BoundedBuffer:
@ThreadSafe
public class ConditionBoundedBuffer <T> {
protected final Lock lock = new ReentrantLock();
// CONDITION PREDICATE: notFull (count < items.length)
private final Condition notFull = lock.newCondition();
// CONDITION PREDICATE: notEmpty (count > 0)
private final Condition notEmpty = lock.newCondition();
private static final int BUFFER_SIZE = 100;
@GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE];
@GuardedBy("lock") private int tail, head, count;
// BLOCKS-UNTIL: notFull
public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// BLOCKS-UNTIL: notEmpty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
T x = items[head];
items[head] = null;
if (++head == items.length)
head = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
注意这里使用了signal而不是signalll,能极大减少每次缓存操作中发生的上下文切换和锁请求次数。
使用condition和内置锁和条件队列一样,必须保卫在lock里面。
4 Synchronizer剖析
看似ReentrantLock和Semaphore功能很类似,每次只允许一定的数量线程通过,到达阀门时:
- 可以通过 lock或者acquire
- 等待,阻塞住了
- 取消tryLock,tryAcquire
- 可中断的,限时的
- 公平等待和非公平等待
下面的程序是使用Lock做一个Mutex也就是持有一个许可的Semaphore。
@ThreadSafe
public class SemaphoreOnLock {
private final Lock lock = new ReentrantLock();
// CONDITION PREDICATE: permitsAvailable (permits > 0)
private final Condition permitsAvailable = lock.newCondition();
@GuardedBy("lock") private int permits;
SemaphoreOnLock(int initialPermits) {
lock.lock();
try {
permits = initialPermits;
} finally {
lock.unlock();
}
}
// BLOCKS-UNTIL: permitsAvailable
public void acquire() throws InterruptedException {
lock.lock();
try {
while (permits <= 0)
permitsAvailable.await();
--permits;
} finally {
lock.unlock();
}
}
public void release() {
lock.lock();
try {
++permits;
permitsAvailable.signal();
} finally {
lock.unlock();
}
}
}
实际上很多J.U.C下面的类都是基于AbstractQueuedSynchronizer (AQS)构建的,例如CountDownLatch, ReentrantReadWriteLock, SynchronousQueue和FutureTask(java7之后不是了)。AQS解决了实现同步器时设计的大量细节问题,例如等待线程采用FIFO队列操作顺序。AQS不仅能极大极少实现同步器的工作量,并且也不必处理竞争问题,基于AQS构建只可能在一个时刻发生阻塞,从而降低上下文切换的开销,提高吞吐量。在设计AQS时,充分考虑了可伸缩性。
5 AbstractQueuedSynchronizer (AQS)
基于AQS构建的同步器类中,最进步的操作包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。
如果一个类想成为状态依赖的类,它必须拥有一些状态,AQS负责管理这些状态,通过getState,setState, compareAndSetState等protected类型方法进行操作。这是设计模式中的模板模式。
使用AQS的模板如下:
获取锁:首先判断当前状态是否允许获取锁,如果是就获取锁,否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。
释放锁:这个过程就是修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程。
boolean acquire() throws InterruptedException {
while (state does not permit acquire) {
if (blocking acquisition requested) {
enqueue current thread if not already queued
block current thread
}
else
return failure
}
possibly update synchronization state
dequeue thread if it was queued
return success
}
void release() {
update synchronization state
if (new state may permit a blocked thread to acquire)
unblock one or more queued threads
}
要支持上面两个操作就必须有下面的条件:
- 原子性操作同步器的状态位
- 阻塞和唤醒线程
- 一个有序的队列
1 状态位的原子操作
这里使用一个32位的整数来描述状态位,前面章节的原子操作的理论知识整好派上用场,在这里依然使用CAS操作来解决这个问题。事实上这里还有一个64位版本的同步器(AbstractQueuedLongSynchronizer),这里暂且不谈。
2 阻塞和唤醒线程
标准的JAVA API里面是无法挂起(阻塞)一个线程,然后在将来某个时刻再唤醒它的。JDK 1.0的API里面有Thread.suspend和Thread.resume,并且一直延续了下来。但是这些都是过时的API,而且也是不推荐的做法。
HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。
在JDK 5.0以后利用JNI在LockSupport类中实现了此特性。
LockSupport.park() LockSupport.park(Object) LockSupport.parkNanos(Object, long) LockSupport.parkNanos(long) LockSupport.parkUntil(Object, long) LockSupport.parkUntil(long) LockSupport.unpark(Thread)
上面的API中park()是在当前线程中调用,导致线
以上是关于Java并发编程实战14构建自定义同步工具(Building-Custom-Synchronizers)的主要内容,如果未能解决你的问题,请参考以下文章
深入了解Java并发——《Java Concurrency in Practice》14.构建自定义的同步工具