1.3.4 并发工具类CountDownLatch/Semaphore/CyclicBarrier/FutureTask
Posted yfzhou528
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.3.4 并发工具类CountDownLatch/Semaphore/CyclicBarrier/FutureTask相关的知识,希望对你有一定的参考价值。
CountDownLatch的2个用途:
1. 所有线程都到达相同的起跑线后,再一起开始跑(并非同时开始,而是队列中一个唤醒另一个)【此情况需到达起跑线后再调用await()等待其他线程】
2. 所有线程都到达终点(执行完)后,再一起庆祝 (并非同时开始,而是队列中一个唤醒另一个)【此情况需到达起终点后再调用await()等待其他线程】
package com.study.concurrent_utils; import java.util.concurrent.CountDownLatch; public class Test_CountDownLatch /* * 没隔1s开启一个线程,共开启6个线程 * 若希望6个线程 同时 执行某一操作 * 可以用CountDownLatch实现 */ public static void test01() throws InterruptedException CountDownLatch ctl = new CountDownLatch(6); for (int i = 0; i < 6; i++) new Thread() @Override public void run() ctl.countDown(); try ctl.await(); // 6个线程都启动执行到此处时,打印如下 System.out.println("here I am..."); catch (InterruptedException e) e.printStackTrace(); .start(); Thread.sleep(1000L); /* * 开启6个线程,6个线程都执行完后,才执行某个操作 * 可以用CountDownLatch来实现 */ public static void test02() throws InterruptedException JamesCountDownLatch ctl = new JamesCountDownLatch(6); for (int i = 0; i < 6; i++) new Thread() @Override public void run() System.out.println("after print..."); ctl.countDown(); .start(); Thread.sleep(1000L); ctl.await(); // 6条线程都执行完后同时打印这句话 System.out.println("main thread do something ..."); public static void main(String args[]) throws InterruptedException test02();
手写CountDownLatch(基于AQS)
countDown()方法:释放共享锁,首先会尝试释放共享锁(其实际是做CAS操作将state减1,如果state减到了0,返回true),如果返回true,说明读锁已释放完,则将等待队列头部线程唤醒。
await()方法:获取共享锁,首先会尝试获取共享锁(其实际操作,获取并判断state值:return getState()==0 ? 1: -1;),若state不是0,即所有线程还没到齐,集体活动还不能开始,此时将其加入等待队列,并且开始自旋,不断判断自己是不是队列头部,即下一个开始跑的是不是自己,是的话就再次尝试获取共享锁,若失败就将自己挂起,若成功即从等待队列移除,并唤醒下一个要获取共享锁的线程。
package com.study.concurrent_utils; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class JamesCountDownLatch private Sync sync; public JamesCountDownLatch(int count) sync = new Sync(count); public void countDown() sync.releaseShared(1); public void await() sync.acquireShared(1); class Sync extends AbstractQueuedSynchronizer public Sync(int count) setState(count); @Override protected int tryAcquireShared(int arg) // 只有当state变为0时,加锁成功 return getState() == 0 ? 1 : -1; /* * countdown的方法 */ @Override protected boolean tryReleaseShared(int arg) for (;;) int c = getState(); if (c == 0) return false; int nextc = c - arg; // 用CAS操作,讲count减一 if (compareAndSetState(c, nextc)) // 当state=0时,释放锁成功,返回true return nextc == 0;
手写Semaphore(基于AQS)
package com.study.concurrent_utils; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class JamesSemaphore private Sync sync; public JamesSemaphore(int state) sync = new Sync(state); public void acquire() sync.acquireShared(1); public void release() sync.releaseShared(1); class Sync extends AbstractQueuedSynchronizer int state; public Sync(int state) this.state = state; @Override protected int tryAcquireShared(int arg) for (;;) int available = getState(); int remaining = available - arg; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; @Override protected boolean tryReleaseShared(int arg) for (;;) int current = getState(); int next = current + arg; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true;
package com.study.concurrent_utils; import java.util.concurrent.CyclicBarrier; public class TestCyclicBarrier public static void main(String[] args) throws InterruptedException CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() @Override public void run() System.out.println(">>>>3个已满,走起<<<"); ); for (int i = 0; i < 30; i++) new Thread(new Runnable() @Override public void run() try cyclicBarrier.await(); System.out.println(Thread.currentThread() + ":start..."); catch (Exception e) e.printStackTrace(); ).start(); Thread.sleep(1000L);
>>>>3个已满,走起<<< Thread[Thread-2,5,main]:start... Thread[Thread-0,5,main]:start... Thread[Thread-1,5,main]:start... >>>>3个已满,走起<<< Thread[Thread-5,5,main]:start... Thread[Thread-3,5,main]:start... Thread[Thread-4,5,main]:start... >>>>3个已满,走起<<< Thread[Thread-8,5,main]:start... Thread[Thread-6,5,main]:start... Thread[Thread-7,5,main]:start...
手写CyclicBarrier(基于ReentrantLock)
ReentrantLock的Condition就是一个等待队列,ReentrantLock是一个可重入锁
package com.study.concurrent_utils; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class JamesCyclicBarrier private final ReentrantLock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); // 记录当前这个批次有多少个 private int count = 0; // 记录批次的大小 private final int parties; // 分代 private Object generation = new Object(); public JamesCyclicBarrier(int parties) if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; // 进入下一个分代 public void nextGeneration() condition.signalAll(); count = 0; generation = new Object(); public void await() // 实现排队,需要将线程放到等待队列 // 还需要将线程挂起 // final ReentrantLock lock = this.lock; lock.lock(); try // 记录当前的generation,相当于记录当前批次的id final Object g = generation; int index = ++count; // 批次已经达到parties, if (index == parties) // 进入下一个批次 nextGeneration(); return; // 若未达到批次,就进入等待 for (;;) try condition.await(); catch (InterruptedException e) if (g != generation) return; finally lock.unlock();
Future/Runnable
package com.study.futuretask; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.locks.LockSupport; public class Demo3_CallableTest public static void main(String args[]) throws InterruptedException, ExecutionException CallTask cTask = new CallTask(); JamesFutureTask<String> fTask = new JamesFutureTask<String>(cTask); // 执行第一次 Thread th = new Thread(fTask); th.start(); System.out.println("begain to get..."); String result = fTask.get(); System.out.println(result); // 执行第二次,失败 Thread th1 = new Thread(fTask); th1.start(); class CallTask implements Callable<String> @Override public String call() throws Exception LockSupport.parkNanos(1000 * 1000 * 1000 * 5L); System.out.println("done..."); return "James";
手写FutureTask
package com.study.futuretask; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class JamesFutureTask<T> implements Runnable // future只能执行一次 private volatile int state = NEW; private static final int NEW = 0; private static final int RUNNING = 1; private static final int FINISED = 2; public JamesFutureTask(Callable<T> task) this.callable = task; // 程序执行的结果 private T result; // 要自行的task Callable<T> callable; // 获取结果的线层等待队列 LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(100); // 执行当前FutureTask的线程,用CAS进行争抢 AtomicReference<Thread> runner = new AtomicReference<>(); @Override public void run() // 判断当前对象的状态,如果是New就执行,如果 if (state != NEW || !runner.compareAndSet(null, Thread.currentThread())) return; state = RUNNING; try result = callable.call(); catch (Exception e) e.printStackTrace(); finally state = FINISED; // 方法执行完,唤醒所有线程 while (true) Thread waiter = waiters.poll(); if (waiter == null) break; LockSupport.unpark(waiter); public T get() if (state != FINISED) waiters.offer(Thread.currentThread()); while (state != FINISED) LockSupport.park(); return result;
以上是关于1.3.4 并发工具类CountDownLatch/Semaphore/CyclicBarrier/FutureTask的主要内容,如果未能解决你的问题,请参考以下文章