多线程等待所有子线程执行完使用总结——CyclicBarrier使用和源码初步分析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程等待所有子线程执行完使用总结——CyclicBarrier使用和源码初步分析相关的知识,希望对你有一定的参考价值。

问题背景

我们在日常开发和学习过程中,经常会使用到多线程的场景,其中我们经常会碰到,我们代码需要等待某个或者多个线程执行完再开始执行,上一篇文章中(参考 https://blog.51cto.com/baorant24/6060871 ),我们介绍了CountDownLatch使用和源码初步分析,本文将介绍一种新的方案,CyclicBarrier类的使用。

问题分析

话不多说,直接上个demo,先看下CyclicBarrier的一般使用方法,代码如下:

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import java.util.concurrent.BrokenBarrierException
import java.util.concurrent.CyclicBarrier

class TestCyclicBarrierActivity : AppCompatActivity() 
    // 线程数
    private val threadSize = 5
    private var cb: CyclicBarrier? = null

    companion object 
        const val TAG = "TestCyclicBarrier"
    

    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_test_cyclic_barrier)

        cb = CyclicBarrier(threadSize) 
            // 裁判
            Log.d(TAG, "参赛者" + cb!!.parties + "个全部准备完毕 --> 各就各位,预备跑");
        

        for (i in 0 until threadSize) 
            AthleteThread().start()
        

        Log.d(TAG, "主线程不用等待,继续执行");
    

    inner class AthleteThread : Thread() 
        override fun run() 
            try 
                // 运动员
                Log.d(TAG, currentThread().name + "号选手准备好了")
                cb?.await()
                println(currentThread().name + "跑,跑,跑")
             catch (e: InterruptedException) 
                e.printStackTrace()
             catch (e: BrokenBarrierException) 
                e.printStackTrace()
            
        
    

运行结果如下: 运行结果分析: 在所有选手准备好之后开始发令起跑,这也就是我们所说的线程同步。它的特点是主线程不用等待,继续执行。

问题解决

上面通过demo了解了CyclicBarrier的一般使用方法,下面我们对CyclicBarrier的源码进行一个初步分析和了解。 (1)核心成员变量

// 使用ReentrantLock锁初始化锁,便于condition产生每组条件
private final ReentrantLock lock = new ReentrantLock();
// 循环锁的核心条件,依赖于ReentrantLock锁
private final Condition trip = lock.newCondition();
// 初始屏障数
private final int parties;
/ /额外的线程任务如每个线程都要执行共同的任务时使用
private final Runnable barrierCommand;
/**
* 主要是作为辅助标志
* 是否异常中断该组阻塞(BrokenBarrierException)和重置下组条件(调用breakBarrier方法)
*/
private Generation generation = new Generation();
// 该类为辅助内部类
private static class Generation 
  boolean broken = false;
 

(2)构造函数

/**
* CyclicBarrier设置屏障数,采用默认的barrierAction为null
*/
public CyclicBarrier(int parties) 
    this(parties, null);

/**
* CyclicBarrier屏障数、额外步骤内容参数去初始化屏障
*/
public CyclicBarrier(int parties, Runnable barrierAction) 
    if (parties <= 0) throw new IllegalArgumentException();
    // 设置初始化屏障数
    this.parties = parties;
    // 剩余屏障数
    this.count = parties;
    // 额外步骤内容
    this.barrierCommand = barrierAction;


(3)await方法(关键代码) java.util.concurrent.CyclicBarrier#await()

public int await() throws InterruptedException, BrokenBarrierException 
    try 
        return dowait(false, 0L);
     catch (TimeoutException toe) 
        throw new Error(toe); // cannot happen
    


java.util.concurrent.CyclicBarrier#dowait

/**
 * dowait主要是为CyclicBarrier普通等待和超时等待await服务
 * timed代表区分普通等待(false)和超时等待(true)、nanos代表超时时间
 */
  private int dowait(boolean timed, long nanos)
      throws InterruptedException,BrokenBarrierException,TimeoutException 
      // 内部锁,利用ReentrantLock进行初始化
      final ReentrantLock lock = this.lock;
      // 获取锁
      lock.lock();
      try 
          final Generation g = generation;
          if (g.broken)
              throw new BrokenBarrierException();
          if (Thread.interrupted()) 
              breakBarrier();
              throw new InterruptedException();
          
          int index = --count;// 当前剩余达到屏障值
          if (index == 0)   // 是否到达改组的唤醒值
              boolean ranAction = false;
              try 
                  final Runnable command = barrierCommand;
                  if (command != null)
                      command.run();// 执行补充任务
                  ranAction = true;// 防止程序出现异常,导致执行finally中逻辑
                  nextGeneration();// 正常唤醒当前组阻塞线程、重置下组初始值count
                  return 0;
               finally 
                  if (!ranAction)
                      breakBarrier();// 程序异常,需要唤醒当前组被阻塞线程和初始下组条件
              
          
          // for循环自旋,主要是未达到该组的屏障值进行阻塞
          for (;;) 
              try     
                  if (!timed) //普通阻塞
                      trip.await(); //调用基于ReentrantLock中产生的Condition条件中的await方法进行阻塞
                  else if (nanos > 0L) //超时阻塞
                      nanos = trip.awaitNanos(nanos);
               catch (InterruptedException ie) // try逻辑出现中断异常
                  if (g == generation && ! g.broken) // 异常辅助是否被改变过
                      breakBarrier();
                      throw ie;
                   else 
                      Thread.currentThread().interrupt();
                  
              
              if (g.broken)
                  throw new BrokenBarrierException();
              if (g != generation)
                  return index;//返回上组的剩余屏障数
              if (timed && nanos <= 0L) //超时阻塞await时且已超时执行
                  breakBarrier();//唤醒当前组阻塞线程和重置下组条件
                  throw new TimeoutException();
              
          
       finally 
          lock.unlock();//释放当前线程锁对象执行权
      
  

问题总结

我们在日常开发和学习过程中,经常会使用到多线程的场景,其中经常会碰到,我们代码需要等待某个或者多个线程执行完再开始执行,上一篇文章中,我们介绍了通过CountDownLatch类来实现,本文介绍了一种新的方案,CyclicBarrier类的使用,有兴趣的同学可以进一步深入研究。

以上是关于多线程等待所有子线程执行完使用总结——CyclicBarrier使用和源码初步分析的主要内容,如果未能解决你的问题,请参考以下文章

多线程等待所有子线程执行完使用总结——CyclicBarrier使用和源码初步分析

java 多线程 , 等待所有子线程都执行完后 , 在执行主线程(其中的一种 , 也是个人觉得最好用的一种)

如何使“主线程”等待“子线程”执行结束后再继续执行

如何使“主线程”等待“子线程”执行结束后再继续执行

C#多线程: 怎么知道 多个线程 执行完毕了?所有的线程执行完毕后 我要做处理

主线程啥都没做,就会等待子线程结束。这是为啥?