并发编程之CountDownLatch
Posted 沸羊羊一个
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之CountDownLatch相关的知识,希望对你有一定的参考价值。
CountDownLatch
多线程控制工具类,用来控制线程等待,用于同步一个或多个任务,它等待其它任务完全后(即计数器为0)后在执行。
1、栗子
package package3;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* CountDownLatch:多线程控制工具类,用来控制线程等待,它可以让某一个线程等待知道倒计时结束,再开始执行
*/
public class CountDownLatchDemo
//实例化并指定计数个数
private static CountDownLatch countDown = new CountDownLatch(3);
public static Runnable r1 = new Runnable()
@Override
public void run()
try
Thread.sleep(1000);
System.out.println("r1 run");
countDown.countDown();//个数+1
catch (Exception e)
e.printStackTrace();
;
public static Runnable r2 = new Runnable()
@Override
public void run()
try
Thread.sleep(2000);
System.out.println("r2 run");
countDown.countDown();
catch (Exception e)
e.printStackTrace();
;
public static Runnable r3 = new Runnable()
@Override
public void run()
try
Thread.sleep(1000);
System.out.println("r3 run");
countDown.countDown();
catch (Exception e)
e.printStackTrace();
;
public static void main(String[] args) throws Exception
System.out.println("===begin=====");
ExecutorService exec = Executors.newFixedThreadPool(3);
exec.submit(r1);
exec.submit(r2);
exec.submit(r3);
//等待检查,等待三个线程都执行完成后,主线程才能继续执行
countDown.await();
System.out.println("====end=====");
exec.shutdown();
输出结果:
===begin=====
r1 run
r3 run
r2 run
====end=====
2、CountDownLatch类结构
- Sync :CountDownLatch使用内部类Sync,而Sync继承AbstractQueuedSynchronizer,采用AQS构建同步器。
- CountDownLatch(int):构造器,初始化计数。
- await():当前线程一直等待,直到计数器为0才往下执行。
- await(long,TimeUnit):设置当前线程等待一段时间,时间一到,不管其它线程是否执行完成。
- getCount():获取当前计数器的值
3、源码分析
CountDownLatch底层采用AQS构建同步器,是共享锁机制,计算值则是通过 CAS 和 volatile 保证其原子性和可见性。
- 对于同步器AQS的详解,可参考AQS框架深入分析
- 线程独占锁和共享锁的说明,可参考java并发-独占锁与共享锁
3.1、内部类Sync
/**
* 对于countdownlatch同步控制。
* 使用 AQS 的状态代表计算值
*/
private static final class Sync extends AbstractQueuedSynchronizer
private static final long serialVersionUID = 4982264981922014374L;
//构造方法这,初始化状态(计算值)
Sync(int count)
setState(count);
//获取当前的计数值
int getCount()
return getState();
//在共享模式下尝试获取状态
protected int tryAcquireShared(int acquires)
return (getState() == 0) ? 1 : -1;
//在共享模式下尝试减量计数,并返回转换后的信号(是否转换到0)
protected boolean tryReleaseShared(int releases)
// Decrement count; signal when transition to zero
for (;;) //无限循环
int c = getState();//获取当前计数值
if (c == 0) //当前计数值为0 说明计算器已完成,没有线程在占用,返回false(不需要调用 doReleaseShared()去释放所有await线程)
return false;
int nextc = c-1;// 下一个状态
if (compareAndSetState(c, nextc))// 一直循环比较,直到获得锁后设置,设置成功返回true
return nextc == 0;//下一个状态nextc成功设置为0,返回true(后面调用 doReleaseShared()去释放所有await线程);否则返回false(不释放),其它线程再继续减量计数
3.2、CountDownLatch(n)构造器
CountDownLatch构造函数调用内部Sync类构造函数,而Sync继承AQS(AbstractQueuedSynchronizer)同步器,利用AQS的state机制使计算值同步共享状态,保证其原子性和可见性。
源码:
/**
* volatile 原子性、可见性的同步状态.
*/
private volatile int state;
/**
* 设置同步状态的值
* @param newState
*/
protected final void setState(int newState)
state = newState;
3.3、countDown()
countDown方法大致流程如下:
源码分析:
1、调用内部类Sync releaseShared方法,共享模式下释放资源。
2、调用tryReleaseShared方法尝试释放资源,如果能成功释放后返回true,执行AQS的doReleaseShared释放await线程。如果false,说明资源已不需要释放。
AQS doReleaseShared源码如下:
/**
* 共享模式的释放(唤醒)操作
*/
private void doReleaseShared()
//无限循环
for (;;)
//唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
//其实就是唤醒上面新获取到共享锁的节点的后继节点
Node h = head;
if (h != null && h != tail) //节点不为空而且不是尾部节点
int ws = h.waitStatus;//获取节点的同步状态
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL)
//这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
//cvs 把标记为设置为0,表示唤醒操作已经开始进行,提高并发环境下性能
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//执行唤醒操作
unparkSuccessor(h);
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head) //如果头结点没有发生变化,表示设置完成,退出循环
break;
3.4、await()
await大致流程:
1、await调用AQS的acquireSharedInterruptibly方法获取锁。
2、如果线程被中断,直接抛出异常。
3、获取当前状态(计数不为0则返回-1)。
4、获取资源。
AQS中acquireSharedInterruptibly和doAcquireSharedInterruptibly方法源码如下:
/**
共享模式获取锁
如果在获取锁的过程中线程被中断,则直接抛出中断异常
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException
if (Thread.interrupted()) //如果线程被中断,直接抛出异常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//在共享模式下尝试获取状态,如果计数不为0则返回-1
//在可中断模式下获取资源
doAcquireSharedInterruptibly(arg);
/**
* 在可中断模式下获取资源
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException
//创建共享节点,加入队列尾部
final Node node = addWaiter(Node.SHARED);
//锁资源获取失败标记位
boolean failed = true;
try
for (;;)
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点就是头结点,则尝试获取锁资源,如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
if (p == head)
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
int r = tryAcquireShared(arg);
if (r >= 0)
//将head指向自己,还有剩余资源可以再唤醒之后的线程
setHeadAndPropagate(node, r);
// setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
p.next = null; // help GC
//表示锁资源成功获取,因此把failed置为false
failed = false;
return;
//判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
finally
if (failed)
//最后会分析获取锁失败处理逻辑
cancelAcquire(node);
4.4、CountDownLatch类源码如下:
package java.util.concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class CountDownLatch
/**
* 对于countdownlatch同步控制。
* 使用AQS状态代表数
*/
private static final class Sync extends AbstractQueuedSynchronizer
private static final long serialVersionUID = 4982264981922014374L;
//构造方法这,初始化状态(计算值)
Sync(int count)
setState(count);
//获取当前的计数值
int getCount()
return getState();
//在共享模式下尝试获取状态
protected int tryAcquireShared(int acquires)
return (getState() == 0) ? 1 : -1;
//在共享模式下尝试减量计数,并返回转换后的信号(是否转换到0)
protected boolean tryReleaseShared(int releases)
// Decrement count; signal when transition to zero
for (;;) //无限循环
int c = getState();//获取当前计数值
if (c == 0) //当前计数值为0 说明计算器已完成,没有线程在占用,返回false(不需要调用 doReleaseShared()去释放所有await线程)
return false;
int nextc = c-1;// 下一个状态
if (compareAndSetState(c, nextc))// 一直循环比较,直到获得锁后设置,设置成功返回true
return nextc == 0;//下一个状态nextc成功设置为0,返回true(后面调用 doReleaseShared()去释放所有await线程);否则返回false(不释放),其它线程再继续减量计数
private final Sync sync;
public CountDownLatch(int count)
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
//线程等待,直到计数器为0
public void await() throws InterruptedException
sync.acquireSharedInterruptibly(1);//共享模式获取锁,如果在获取锁的过程中线程被中断,则直接抛出中断异常
/**
* 线程指定时间内等待
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));//试在指定纳秒时间内获取共享锁,如果被中断则抛出中断异常
/**
* 计算值-1
*/
public void countDown()
sync.releaseShared(1);//释放锁资源
/**
* 获取当前的计数值
*/
public long getCount()
return sync.getCount();
public String toString()
return super.toString() + "[Count = " + sync.getCount() + "]";
以上是关于并发编程之CountDownLatch的主要内容,如果未能解决你的问题,请参考以下文章
java并发编程之CountDownLatch与CyclicBarrier
并发编程-AQS同步组件之CountDownLatch 闭锁