Java并发包中线程并发器
Posted FFStayF
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发包中线程并发器相关的知识,希望对你有一定的参考价值。
一、CountDownLatch
场景:主线程需要等待所有子线程执行完毕后再进行汇总
CountDownLatch实现比较简单,继承AQS实现了一个不可重入共享锁Sync
1.不可重入共享锁Sync
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); }
//尝试获取锁 仅state==0时才能获取成功 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
//尝试释放锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
2.方法
1)void await()
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);//尝试获取锁,不忽略中断引起的返回 }
2)boolean await(long timeout, TimeUnit unit)
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));//尝试一定时间内获取锁 }
3)void countDown()
public void countDown() { sync.releaseShared(1); }
3.实例
public class CountDownLatchTest { //定义CountDownLatch 实际创建共享锁 且锁已被两个线程持有 state == 2 private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); pool.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println("childThreadOne over"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //线程1释放共享锁,state-- countDownLatch.countDown(); } } }); pool.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println("childThreadTwo over"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //线程2释放共享锁,state-- countDownLatch.countDown(); } } }); System.out.println("wait all child thread over"); //主线程阻塞, 实际尝试获取共享锁 ,仅state == 0时获取成功或被中断打断引起异常 countDownLatch.await(); System.out.println("all child thread over"); pool.shutdown(); } }
二、CyclicBarrier回环屏障
和CountDownLatch场景一样,但是CountDownLatch是一次性的,CyclicBarrier可重复使用;实现方式不同,所以使用方式不同,范围更大,见后面实例
CyclicBarrier采用独占锁ReentranLock及条件变量trip(阻塞到达屏障的线程)实现
设置一道屏障,①当线程数小于屏障规定的线程数时,线程入trip条件阻塞队列,线程阻塞;②当线程数等于屏障规定的线程数时,唤醒trip中所有的线程,并重置计数器状态(越过屏障)
另外CyclicBarrier也不忽略中断引起的返回,会抛出异常,屏障会失效,抛错genetation.barrier = true
1)变量与构造方法
/** 独占锁 */ private final ReentrantLock lock = new ReentrantLock(); /** 条件变量 */ private final Condition trip = lock.newCondition(); /** 屏障阻塞的线程个数 */ private final int parties; /* 突破屏障后执行的任务 默认为空 */ private final Runnable barrierCommand; /** 默认false,当前屏障被中断打破后,设置为true,继续使用屏障会抛出异常BrokenBarrierException */ private Generation generation = new Generation(); /** * 实际计数器 count == 0时突破屏障 */ private int count; public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
2.方法
1)int dowait(boolean timed, long nanos)
/** * 主要代码 */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) //中断引起的跨过屏障,后续await屏障都会抛错 throw new BrokenBarrierException(); if (Thread.interrupted()) { //当前线程被中断,唤醒trip的所有阻塞线程,设置g.broken == true,抛出异常 breakBarrier(); throw new InterruptedException(); } //调用一次数据器-1 int index = --count; //当计数器 == 0时,达到屏蔽的线程数,越过屏障 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) //先执行屏障任务 command.run(); ranAction = true; //唤醒条件变量中所有线程trip.signalAll(); //重置计数器count = parties; //重置版本generation = new Generation(); nextGeneration(); return 0; } finally { if (!ranAction) //执行屏障任务抛错时, //依然唤醒所有阻塞线程, //但设置g.barrier == true,后续屏障都会抛错 breakBarrier(); } } // 当计数器 != 0 时,当前线程入条件阻塞队列 for (;;) { try { if (!timed) //无限阻塞 trip.await(); else if (nanos > 0L) //超时阻塞 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
2)int await()
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
3) int await(long timeout, TimeUnit unit)
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
3.实例
public class CyclicBarrierTest { //设置屏障线程数为2 state = 2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args){ ExecutorService pool = Executors.newFixedThreadPool(2); pool.submit(new Runnable(){ @Override public void run() { try { System.out.println("thread1 step1"); //线程1入trip阻塞队列,state-- cyclicBarrier.await(); System.out.println("thread1 step2"); cyclicBarrier.await(); System.out.println("thread1 step3"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); pool.submit(new Runnable(){ @Override public void run() { try { System.out.println("thread2 step1"); //线程2入trip阻塞队列,state-- //与线程1的step1一起导致state == 0,越过屏障唤醒两个线程,state重新设置为2后续逻辑一致 cyclicBarrier.await(); System.out.println("thread2 step2"); cyclicBarrier.await(); System.out.println("thread2 step3"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); pool.shutdown(); } }
三、Semaphore
场景:与CountDownLatch一样
信号量同步器设计类似于CountDownLatch,不同的是计数器是递增的
Semaphore不仅实现了公平锁,还实现了非公平锁
1.共享锁Sync
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } /** * 非公平锁 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * 公平锁 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
2实例
public class SemaphoreTest { //信号量 private static Semaphore semaphore = new Semaphore(0); public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); pool.submit(new Runnable() { @Override public void run() { try { System.out.println("thread1 over"); //释放+1 semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }); pool.submit(new Runnable() { @Override public void run() { try { System.out.println("thread2 over"); //释放+1 semaphore.release(); } catch (Exception e) { e.printStackTrace(); } } }); //同步 semaphore.acquire(2); System.out.println("all child thread over"); pool.shutdown(); } }
四、总结
1.线程同步的设计类似《操作系统原理》中的进程同步,信号量机制,PV操作
2.CountDownLatch实现线程同步(计数器自减),是一次性的,仅支持公平锁,线程FIFO;
CyclicBarrier实现线程同步(计数器自减),是可复用的(计数器还原),使用独占锁ReentranLock的条件变量trip的阻塞队列实现。
Semaphore实现线程同步(计数器自增),也是可以复用的(计数器归0),提供公平锁与非公平锁实现。
以上是关于Java并发包中线程并发器的主要内容,如果未能解决你的问题,请参考以下文章