jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析

Posted Itzel_yuki

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析相关的知识,希望对你有一定的参考价值。

一、继承关系

public class CyclicBarrier


功能:

        实现n个线程(线程之间是独占锁的关系)相互等待,最后一个执行wait操作的线程需要执行指定的命令barrierCommand,并且还需要唤醒其他所有等待的线程,还要初始化相关参数,为下一轮相互等待做好准备。n个线程中任何一个线程发生中断,超时,则设置代标志generation.broken为true,其他线程检测到该标识说明此轮等待失败。

实现方式:

        使用的是AQS中独占锁CLH队列+CONDITION队列的实现方式。当调用barrier.await方法时,该线程要获得锁,因此进入CLH队列队尾,当获取到锁后,执行condition.wait操作时,将该线程节点从CLH队列队首删除,添加到CONDITION队列中。将count-1个线程以上述方式逐个添加到CONDITIO队列中,当最后一个线程获取到锁后,执行barrierCommand任务,然后执行condition.signalAll方法,唤醒CONDITION队列中所有节点,CONDITION队列从队首开始逐个将node添加到CLH队列中,开始获取锁,执行完后继操作后从barrier.await中返回。


说明:

        调用时不要求和lock结合使用,barrier内部的await方法中使用了lock和condition.await和signal操作。


二、成员变量

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();//独占锁
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();//条件,lock中的一个CONDITION队列
/** The number of parties */
private final int parties;//参与相互等待的线程数量
/* The command to run when tripped */
private final Runnable barrierCommand;//最后一个执行dowait的线程需要执行的代码
/** The current generation */
private Generation generation = new Generation();//下一批循环等待的代标志

private int count;//参与等待的线程数量

三、构造方法

//指定参与相互等待的线程的数量parties,
	public CyclicBarrier(int parties, Runnable barrierAction) 
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    
	
	public CyclicBarrier(int parties) 
        this(parties, null);
    


四、内部类

(1)Generation

private static class Generation 
        boolean broken = false;
    

五、方法说明

(1)await:

public int await() throws InterruptedException, BrokenBarrierException 
        try 
            return dowait(false, 0L);
         catch (TimeoutException toe) 
            throw new Error(toe); // cannot happen
        
    
	
	private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException 
        final ReentrantLock lock = this.lock;
		//加锁
        lock.lock();
        try 
            final Generation g = generation;
			//g.broken==true表示这一轮相互等待的线程中,有线程抛出了异常,则该线程在此处也要抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
				
			//当前线程是否发生了中断,是则调用breakBarrier设置generation的broken值,以便通知其他相互等待的线程有线程发生异常。
            if (Thread.interrupted()) 
                breakBarrier();
                throw new InterruptedException();
            
			
            int index = --count;
            if (index == 0)   // 最后一个执行dowait的线程执行此处的代码
                boolean ranAction = false;
                try 
                    final Runnable command = barrierCommand;
					//最后一个到达的线程执行barrierCommand
                    if (command != null)
                        command.run();
                    ranAction = true;
					//该轮等待结束,可以开启下一轮等待,唤醒所有该轮中的阻塞线程
                    nextGeneration();
                    return 0;
                 finally 
				//如果ranAction为false,则该轮相互等待失败,调用breakBarrier通知
                    if (!ranAction)
                        breakBarrier();
                
            

            // 循环,直到该线程被唤醒,或者超时,或者发生中断
            for (;;) 
                try 
					//该线程执行await操作,释放该线程占有的锁,唤醒CLH队列中的后继节点
                    if (!timed)
                        trip.await();
					//awaitNanos:加入时间等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                 catch (InterruptedException ie) 
					//发生异常,若当前线程是该轮线程中第一个发生异常的线程,则执行breakBarrier通知其他线程
                    if (g == generation && ! g.broken) 
                        breakBarrier();
                        throw ie;
                     else 
                        //否则,该线程之前已经有线程发生异常,此处只需要中断当前线程即可
                        Thread.currentThread().interrupt();
                    
                
				//如果该轮相互等待失败,则抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
				//说明此轮等待已经结束
                if (g != generation)
                    return index;
				//超时,调用breakBarrier通知其他线程本轮相互等待失败
                if (timed && nanos <= 0L) 
                    breakBarrier();
                    throw new TimeoutException();
                
            
         finally 
			//释放锁
            lock.unlock();
        
    
	//这一轮相互等待过程中如果某个线程发生异常,则该轮等待的标志generation.broken设置为true,通知其他相互等待的线程有线程抛出异常
	private void breakBarrier() 
        generation.broken = true;
        count = parties;
		//唤醒CONDITION队列中所有线程
        trip.signalAll();
    
	//开启下一轮等待
	private void nextGeneration() 
        //唤醒上一轮等待中的所有线程
        trip.signalAll();
        // 重新初始化count和generation
        count = parties;
        generation = new Generation();
    

(2)await

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException 
        return dowait(true, unit.toNanos(timeout));
    

(3)reset:重新初始化参数,开启新的一轮等待。

public void reset() 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
         finally 
            lock.unlock();
        
    

六、使用示例

class Persion extends Thread
	String name;
	CyclicBarrier barrier;
	int time;
	public Persion(String name,CyclicBarrier barrier,int time)
		super();
		this.name=name;
		this.barrier=barrier;
		this.time=time;
	
	@Override
	public void run() 
		LockSupport.parkNanos(time*10000);
		System.out.println(name+"来了,开始等待其他人。");
		try 
			barrier.await();
		 catch (InterruptedException | BrokenBarrierException e) 
			// TODO Auto-generated catch block
			e.printStackTrace();
		
		System.out.println(name+"发言...");
		
	

public static void testCyclicBarrier()
	CyclicBarrier barrier=new CyclicBarrier(5, new Runnable() 
		
		@Override
		public void run() 
			// TODO Auto-generated method stub
			System.out.println("所有人都到齐了,开始开会!");
			
		
	);
	Persion persion1=new Persion("张三", barrier, 1);
	Persion persion2=new Persion("李四", barrier, 2);
	Persion persion3=new Persion("王五", barrier, 3);
	Persion persion4=new Persion("赵六", barrier, 1);
	Persion persion5=new Persion("钱七", barrier, 2);
	persion1.start();
	persion2.start();
	persion3.start();
	persion4.start();
	persion5.start();

输出结果:

张三来了,开始等待其他人。
钱七来了,开始等待其他人。
赵六来了,开始等待其他人。
王五来了,开始等待其他人。
李四来了,开始等待其他人。
所有人都到齐了,开始开会!
李四发言...
张三发言...
赵六发言...
钱七发言...
王五发言...

以上是关于jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析的主要内容,如果未能解决你的问题,请参考以下文章

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析

jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析

jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析

jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析