1.3.4 并发工具类CountDownLatch/Semaphore/CyclicBarrier/FutureTask

Posted yfzhou528

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.3.4 并发工具类CountDownLatch/Semaphore/CyclicBarrier/FutureTask相关的知识,希望对你有一定的参考价值。

技术图片

CountDownLatch的2个用途:

1. 所有线程都到达相同的起跑线后,再一起开始跑(并非同时开始,而是队列中一个唤醒另一个)【此情况需到达起跑线后再调用await()等待其他线程】

2. 所有线程都到达终点(执行完)后,再一起庆祝 (并非同时开始,而是队列中一个唤醒另一个)【此情况需到达起终点后再调用await()等待其他线程】

package com.study.concurrent_utils;

import java.util.concurrent.CountDownLatch;

public class Test_CountDownLatch 

	/*
	 * 没隔1s开启一个线程,共开启6个线程 
	 * 若希望6个线程 同时 执行某一操作 
	 * 可以用CountDownLatch实现
	 */
	public static void test01() throws InterruptedException 
		CountDownLatch ctl = new CountDownLatch(6);

		for (int i = 0; i < 6; i++) 
			new Thread() 
				@Override
				public void run() 
					ctl.countDown();
					try 
						ctl.await();
						// 6个线程都启动执行到此处时,打印如下
						System.out.println("here I am...");
					 catch (InterruptedException e) 
						e.printStackTrace();
					
				
			.start();
			Thread.sleep(1000L);
		
	

	/*
	 * 开启6个线程,6个线程都执行完后,才执行某个操作 
	 * 可以用CountDownLatch来实现
	 */
	public static void test02() throws InterruptedException 
		JamesCountDownLatch ctl = new JamesCountDownLatch(6);

		for (int i = 0; i < 6; i++) 
			new Thread() 
				@Override
				public void run() 
					System.out.println("after print...");
					ctl.countDown();
				
			.start();
			Thread.sleep(1000L);
		

		ctl.await();
		// 6条线程都执行完后同时打印这句话
		System.out.println("main thread do something ...");

	

	public static void main(String args[]) throws InterruptedException 
		test02();
	

 手写CountDownLatch(基于AQS)

countDown()方法:释放共享锁,首先会尝试释放共享锁(其实际是做CAS操作将state减1,如果state减到了0,返回true),如果返回true,说明读锁已释放完,则将等待队列头部线程唤醒。

await()方法:获取共享锁,首先会尝试获取共享锁(其实际操作,获取并判断state值:return getState()==0 ? 1: -1;),若state不是0,即所有线程还没到齐,集体活动还不能开始,此时将其加入等待队列,并且开始自旋,不断判断自己是不是队列头部,即下一个开始跑的是不是自己,是的话就再次尝试获取共享锁,若失败就将自己挂起,若成功即从等待队列移除,并唤醒下一个要获取共享锁的线程。

package com.study.concurrent_utils;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class JamesCountDownLatch 

	private Sync sync;

	public JamesCountDownLatch(int count) 
		sync = new Sync(count);
	

	public void countDown() 
		sync.releaseShared(1);
	

	public void await() 
		sync.acquireShared(1);
	

	class Sync extends AbstractQueuedSynchronizer 
		public Sync(int count) 
			setState(count);
		

		@Override
		protected int tryAcquireShared(int arg) 
			// 只有当state变为0时,加锁成功
			return getState() == 0 ? 1 : -1;
		

		/*
		 * countdown的方法
		 */
		@Override
		protected boolean tryReleaseShared(int arg) 
			for (;;) 
				int c = getState();
				if (c == 0)
					return false;
				int nextc = c - arg;
				// 用CAS操作,讲count减一
				if (compareAndSetState(c, nextc)) 
					// 当state=0时,释放锁成功,返回true
					return nextc == 0;
				
			
		
	

 技术图片

手写Semaphore(基于AQS)

package com.study.concurrent_utils;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class JamesSemaphore 

	private Sync sync;

	public JamesSemaphore(int state) 
		sync = new Sync(state);
	

	public void acquire() 
		sync.acquireShared(1);
	

	public void release() 
		sync.releaseShared(1);
	

	class Sync extends AbstractQueuedSynchronizer 
		int state;

		public Sync(int state) 
			this.state = state;
		

		@Override
		protected int tryAcquireShared(int arg) 
			for (;;) 
				int available = getState();
				int remaining = available - arg;
				if (remaining < 0 || compareAndSetState(available, remaining))
					return remaining;
			
		

		@Override
		protected boolean tryReleaseShared(int arg) 
			for (;;) 
				int current = getState();
				int next = current + arg;
				if (next < current) // overflow
					throw new Error("Maximum permit count exceeded");
				if (compareAndSetState(current, next))
					return true;
			
		
	

技术图片

package com.study.concurrent_utils;

import java.util.concurrent.CyclicBarrier;

public class TestCyclicBarrier 
	public static void main(String[] args) throws InterruptedException 
		CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() 
			@Override
			public void run() 
				System.out.println(">>>>3个已满,走起<<<");
			
		);

		for (int i = 0; i < 30; i++) 
			new Thread(new Runnable() 
				@Override
				public void run() 
					try 
						cyclicBarrier.await();
						System.out.println(Thread.currentThread() + ":start...");
					 catch (Exception e) 
						e.printStackTrace();
					
				
			).start();
			Thread.sleep(1000L);
		
	
>>>>3个已满,走起<<<
Thread[Thread-2,5,main]:start...
Thread[Thread-0,5,main]:start...
Thread[Thread-1,5,main]:start...
>>>>3个已满,走起<<<
Thread[Thread-5,5,main]:start...
Thread[Thread-3,5,main]:start...
Thread[Thread-4,5,main]:start...
>>>>3个已满,走起<<<
Thread[Thread-8,5,main]:start...
Thread[Thread-6,5,main]:start...
Thread[Thread-7,5,main]:start...

手写CyclicBarrier(基于ReentrantLock)

ReentrantLock的Condition就是一个等待队列,ReentrantLock是一个可重入锁

package com.study.concurrent_utils;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class JamesCyclicBarrier 

	private final ReentrantLock lock = new ReentrantLock();
	private final Condition condition = lock.newCondition();

	// 记录当前这个批次有多少个
	private int count = 0;

	// 记录批次的大小
	private final int parties;

	// 分代
	private Object generation = new Object();

	public JamesCyclicBarrier(int parties) 
		if (parties <= 0)
			throw new IllegalArgumentException();
		this.parties = parties;
	

	// 进入下一个分代
	public void nextGeneration() 
		condition.signalAll();
		count = 0;
		generation = new Object();
	

	public void await() 
		// 实现排队,需要将线程放到等待队列
		// 还需要将线程挂起
		//
		final ReentrantLock lock = this.lock;
		lock.lock();
		try 
			// 记录当前的generation,相当于记录当前批次的id
			final Object g = generation;

			int index = ++count;
			// 批次已经达到parties,
			if (index == parties) 
				// 进入下一个批次
				nextGeneration();
				return;
			

			// 若未达到批次,就进入等待
			for (;;) 
				try 
					condition.await();
				 catch (InterruptedException e) 

				
				if (g != generation) 
					return;
				
			

		 finally 
			lock.unlock();
		
	

 Future/Runnable

package com.study.futuretask;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.LockSupport;

public class Demo3_CallableTest 
	public static void main(String args[]) throws InterruptedException, ExecutionException 
		CallTask cTask = new CallTask();
		JamesFutureTask<String> fTask = new JamesFutureTask<String>(cTask);

		// 执行第一次
		Thread th = new Thread(fTask);
		th.start();

		System.out.println("begain to get...");
		String result = fTask.get();
		System.out.println(result);

		// 执行第二次,失败
		Thread th1 = new Thread(fTask);
		th1.start();
	


class CallTask implements Callable<String> 

	@Override
	public String call() throws Exception 
		LockSupport.parkNanos(1000 * 1000 * 1000 * 5L);
		System.out.println("done...");
		return "James";
	

 手写FutureTask

package com.study.futuretask;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class JamesFutureTask<T> implements Runnable 

	// future只能执行一次
	private volatile int state = NEW;
	private static final int NEW = 0;
	private static final int RUNNING = 1;
	private static final int FINISED = 2;

	public JamesFutureTask(Callable<T> task) 
		this.callable = task;
	

	// 程序执行的结果
	private T result;

	// 要自行的task
	Callable<T> callable;

	// 获取结果的线层等待队列
	LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(100);

	// 执行当前FutureTask的线程,用CAS进行争抢
	AtomicReference<Thread> runner = new AtomicReference<>();

	@Override
	public void run() 
		// 判断当前对象的状态,如果是New就执行,如果
		if (state != NEW || !runner.compareAndSet(null, Thread.currentThread()))
			return;
		state = RUNNING;

		try 
			result = callable.call();
		 catch (Exception e) 
			e.printStackTrace();
		 finally 
			state = FINISED;
		

		// 方法执行完,唤醒所有线程
		while (true) 
			Thread waiter = waiters.poll();
			if (waiter == null)
				break;
			LockSupport.unpark(waiter);
		

	

	public T get() 
		if (state != FINISED) 
			waiters.offer(Thread.currentThread());
		

		while (state != FINISED) 
			LockSupport.park();
		

		return result;
	

 

以上是关于1.3.4 并发工具类CountDownLatch/Semaphore/CyclicBarrier/FutureTask的主要内容,如果未能解决你的问题,请参考以下文章

并发工具类:倒计时器-CountDownLatch

Java并发之CountDownLatch工具类

并发工具类等待多线程的CountDownLatch

Java并发多线程编程——并发工具类CountDownLatch(线程计数器)

Java并发工具类:CountDownLatch

深入理解J.U.C并发工具类