Java多线程中如何使用CountDownLatch?

Posted

技术标签:

【中文标题】Java多线程中如何使用CountDownLatch?【英文标题】:How is CountDownLatch used in Java Multithreading? 【发布时间】:2013-07-23 12:23:40 【问题描述】:

谁能帮我理解 Java CountDownLatch 是什么以及何时使用它?

我对这个程序的工作原理不是很清楚。据我了解,所有三个线程同时启动,每个线程将在 3000 毫秒后调用 CountDownLatch。所以倒计时会一一递减。锁存器变为零后,程序会打印“已完成”。也许我理解的方式是不正确的。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Processor implements Runnable 
    private CountDownLatch latch;

    public Processor(CountDownLatch latch) 
        this.latch = latch;
    

    public void run() 
        System.out.println("Started.");

        try 
            Thread.sleep(3000);
         catch (InterruptedException e) 
            e.printStackTrace();
        

        latch.countDown();
    

// --------------------------------------------- --------

public class App 

    public static void main(String[] args) 

        CountDownLatch latch = new CountDownLatch(3); // coundown from 3 to 0

        ExecutorService executor = Executors.newFixedThreadPool(3); // 3 Threads in pool

        for(int i=0; i < 3; i++) 
            executor.submit(new Processor(latch)); // ref to latch. each time call new Processes latch will count down by 1
        

        try 
            latch.await();  // wait until latch counted down to 0
         catch (InterruptedException e) 
            e.printStackTrace();
        

        System.out.println("Completed.");
    


【问题讨论】:

docs.oracle.com/javase/7/docs/api/java/util/concurrent/… 我刚刚将您的问题示例代码用于 android 并行服务批处理,它就像一个魅力。非常感谢! 从 2012 年的 this video 获得这里,这与此处显示的示例非常相似。对于任何感兴趣的人,这是来自一个名叫 John 的人的 Java 多线程教程系列的一部分。我喜欢约翰。强烈推荐。 【参考方案1】:

是的,你理解正确。 CountDownLatch 工作原理是闩锁,主线程会一直等到门打开。一个线程等待 n 个线程,在创建 CountDownLatch 时指定。

任何调用CountDownLatch.await() 的线程,通常是应用程序的主线程,都会等待计数达到零或被另一个线程中断。一旦完成或准备好,所有其他线程都需要通过调用CountDownLatch.countDown() 来倒计时。

一旦计数达到零,等待线程继续。 CountDownLatch 的缺点/优点之一是它不可重复使用:一旦计数达到零,您就不能再使用 CountDownLatch

编辑:

当一个线程(如主线程)需要等待一个或多个线程完成才能继续处理时,使用CountDownLatch

在 Java 中使用CountDownLatch 的经典示例是使用服务架构的服务器端核心 Java 应用程序,其中多个服务由多个线程提供,并且在所有服务都成功启动之前应用程序无法开始处理。

附: OP 的问题有一个非常简单的例子,所以我没有包括一个。

【讨论】:

感谢您的回复。你能给我一个应用倒计时闩锁的例子吗? CountDownLatch的使用教程在这里howtodoinjava.com/2013/07/18/… @NikolaB 但是在这个给定的例子中,我们可以通过使用 join 方法获得相同的结果,不是吗? 我认为不可重用性是一个优势:您确定没有人可以重置它或增加计数。 很好的解释。但我对One thread waits for n number of threads specified while creating CountDownLatch in Java这一点略有不同意见。如果你需要这样的机制,那么谨慎使用CyclicBarrierJava concurrency in Practice 中给出的这两者之间的基本概念区别是:Latches are for waiting for events; barriers are for waiting for other threadscyclicBarrier.await() 进入阻塞状态。【参考方案2】:

Java 中的CountDownLatch 是一种同步器,它允许一个Thread 在开始处理之前等待一个或多个Threads。

CountDownLatch 的工作原理是闩锁,线程将等待直到门打开。一个线程等待n 创建CountDownLatch 时指定的线程数。

例如final CountDownLatch latch = new CountDownLatch(3);

这里我们将计数器设置为 3。

调用CountDownLatch.await() 的任何线程,通常是应用程序的主线程,将等待计数达到零或被另一个Thread 中断。一旦完成或准备好工作,所有其他线程都需要通过调用CountDownLatch.countDown() 进行倒计时。一旦计数达到零,Thread 等待开始运行。

这里的计数被CountDownLatch.countDown() 方法递减。

调用await()方法的Thread会一直等到初始计数归零。

要使计数为零,其他线程需要调用countDown() 方法。 一旦计数变为零,调用await() 方法的线程将恢复(开始执行)。

CountDownLatch 的缺点是它不可重复使用:一旦计数变为零,它就不再可用。

【讨论】:

我们是否使用 new CountDownLatch(3),因为我们定义了来自 newFixedThreadPool 的 3 个线程? “开始处理之前”不应该是“继续处理之前”吗? @Arefe 是的,这是通过您的代码块的线程数【参考方案3】:

NikolaB 解释的很好,但是例子会有助于理解,所以这里是一个简单的例子...

 import java.util.concurrent.*;


  public class CountDownLatchExample 

  public static class ProcessThread implements Runnable 

    CountDownLatch latch;
    long workDuration;
    String name;

    public ProcessThread(String name, CountDownLatch latch, long duration)
        this.name= name;
        this.latch = latch;
        this.workDuration = duration;
    


    public void run() 
        try 
            System.out.println(name +" Processing Something for "+ workDuration/1000 + " Seconds");
            Thread.sleep(workDuration);
         catch (InterruptedException e) 
            e.printStackTrace();
        
        System.out.println(name+ "completed its works");
        //when task finished.. count down the latch count...

        // basically this is same as calling lock object notify(), and object here is latch
        latch.countDown();
    



public static void main(String[] args) 
    // Parent thread creating a latch object
    CountDownLatch latch = new CountDownLatch(3);

    new Thread(new ProcessThread("Worker1",latch, 2000)).start(); // time in millis.. 2 secs
    new Thread(new ProcessThread("Worker2",latch, 6000)).start();//6 secs
    new Thread(new ProcessThread("Worker3",latch, 4000)).start();//4 secs


    System.out.println("waiting for Children processes to complete....");
    try 
        //current thread will get notified if all chidren's are done 
        // and thread will resume from wait() mode.
        latch.await();
     catch (InterruptedException e) 
        e.printStackTrace();
    

    System.out.println("All Process Completed....");

    System.out.println("Parent Thread Resuming work....");



     
  

【讨论】:

【参考方案4】:

当我们想要等待多个线程完成其任务时使用它。类似于线程中的join。

我们可以在哪里使用 CountDownLatch

考虑一个场景,我们有三个线程“A”、“B”和“C”,并且我们希望仅在“A”和“B”线程完成或部分完成它们的线程时启动线程“C”任务。

可应用于现实世界的 IT 场景

假设经理在开发团队(A 和 B)之间划分模块,并且他希望仅在两个团队都完成任务时将其分配给 QA 团队进行测试。

public class Manager 
    public static void main(String[] args) throws InterruptedException 
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MyDevTeam teamDevA = new MyDevTeam(countDownLatch, "devA");
        MyDevTeam teamDevB = new MyDevTeam(countDownLatch, "devB");
        teamDevA.start();
        teamDevB.start();
        countDownLatch.await();
        MyQATeam qa = new MyQATeam();
        qa.start();
       


class MyDevTeam extends Thread    
    CountDownLatch countDownLatch;
    public MyDevTeam (CountDownLatch countDownLatch, String name) 
        super(name);
        this.countDownLatch = countDownLatch;       
       
    @Override
    public void run() 
        System.out.println("Task assigned to development team " + Thread.currentThread().getName());
        try 
                Thread.sleep(2000);
         catch (InterruptedException ex) 
                ex.printStackTrace();
        
    System.out.println("Task finished by development team Thread.currentThread().getName());
            this.countDownLatch.countDown();
    


class MyQATeam extends Thread    
    @Override
    public void run() 
        System.out.println("Task assigned to QA team");
        try 
                Thread.sleep(2000);
         catch (InterruptedException ex) 
            ex.printStackTrace();
        
        System.out.println("Task finished by QA team");
    

上述代码的输出将是:

分配给开发团队 devB 的任务

分配给开发团队 devA 的任务

开发团队 devB 完成的任务

开发团队 devA 完成的任务

分配给 QA 团队的任务

QA 团队完成的任务

这里await()方法等待countdownlatch标志变为0,countDown()方法将countdownlatch标志减1。

JOIN 的限制: 上面的例子也可以用JOIN来实现,但是JOIN不能用于两种场景:

    当我们使用 ExecutorService 而不是 Thread 类来创建线程时。 修改上面的示例,其中经理希望在开发完成 80% 的任务后立即将代码移交给 QA 团队。这意味着 CountDownLatch 允许我们修改可用于等待另一个线程部分执行的实现。

【讨论】:

【参考方案5】:

CoundDownLatch 使您可以让一个线程等待,直到所有其他线程完成执行。

伪代码可以是:

// Main thread starts
// Create CountDownLatch for N threads
// Create and start N threads
// Main thread waits on latch
// N threads completes there tasks are returns
// Main thread resume execution

【讨论】:

您可能希望从代码块中移出所有描述 最好的评论。我喜欢这些“直截了当”的 cmets,而不是理论解释。【参考方案6】:

正如 JavaDoc (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html) 中提到的,CountDownLatch 是一种同步辅助,在 Java 5 中引入。这里的同步并不意味着限制对关键部分的访问。而是对不同线程的操作进行排序。 通过 CountDownLatch 实现的同步类型与 Join 类似。 假设有一个线程“M”需要等待其他工作线程“T1”、“T2”、“T3”完成其任务 在 Java 1.5 之前,可以这样做的方式是,M 运行以下代码

    T1.join();
    T2.join();
    T3.join();

上面的代码确保线程 M 在 T1、T2、T3 完成它的工作之后恢复它的工作。 T1、T2、T3可以任意顺序完成工作。 同样可以通过 CountDownLatch 实现,其中 T1、T2、T3 和线程 M 共享同一个 CountDownLatch 对象。 “M”请求: countDownLatch.await(); 其中 "T1","T2","T3" 是 countDownLatch.countdown();

join 方法的一个缺点是 M 必须知道 T1、T2、T3。如果稍后添加了新的工作线程 T4,那么 M 也必须意识到它。使用 CountDownLatch 可以避免这种情况。 执行后的动作顺序为[T1,T2,T3](T1,T2,T3的顺序可以是反正)-> [M]

【讨论】:

【参考方案7】:

使用 Java Simple Serial Connector 访问串行端口是何时使用此类功能的一个很好的例子。通常,您将向端口写入一些内容,并且异步地,在另一个线程上,设备将响应 SerialPortEventListener。通常,您需要在写入端口后暂停以等待响应。手动处理这种情况下的线程锁非常棘手,但使用 Countdownlatch 很容易。在你想你可以用另一种方式做之前,小心你从未想过的比赛条件!

伪代码:

CountDownLatch latch;
void writeData()  
   latch = new CountDownLatch(1);
   serialPort.writeBytes(sb.toString().getBytes())
   try 
      latch.await(4, TimeUnit.SECONDS);
     catch (InterruptedException e) 
   

class SerialPortReader implements SerialPortEventListener 
    public void serialEvent(SerialPortEvent event) 
        if(event.isRXCHAR())//If data is available
            byte buffer[] = serialPort.readBytes(event.getEventValue());
            latch.countDown();
         
     

【讨论】:

【参考方案8】:

如果您在调用latch.countDown() 之后添加一些调试,这可能会帮助您更好地理解它的行为。

latch.countDown();
System.out.println("DONE "+this.latch); // Add this debug

输出将显示 Count 正在递减。这个“计数”实际上是您启动的可运行任务(处理器对象)的数量,countDown() 被调用,因此在调用latch.await( )。

DONE java.util.concurrent.CountDownLatch@70e69696[Count = 2]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 1]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 0]

【讨论】:

【参考方案9】:

来自关于CountDownLatch的oracle文档:

一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

CountDownLatch 使用给定的计数进行初始化。 await 方法会阻塞,直到由于调用countDown() 方法而使当前计数达到零,之后所有等待的线程都被释放,任何后续的 await 调用都会立即返回。这是一次性现象——计数无法重置。

CountDownLatch 是一种多功能同步工具,可用于多种用途。

使用计数为 1 初始化的 CountDownLatch 用作简单的开/关锁存器或门:所有调用 await 的线程在门处等待,直到它被调用 countDown() 的线程打开。

初始化为 N 的CountDownLatch 可用于使一个线程等待,直到 N 个线程完成某个动作,或者某个动作已完成 N 次。

public void await()
           throws InterruptedException

导致当前线程等待直到锁存器倒计时到零,除非线程被中断。

如果当前计数为零,则此方法立即返回。

public void countDown()

减少锁存器的计数,如果计数达到零,则释放所有等待的线程。

如果当前计数大于零,则递减。如果新计数为零,则重新启用所有等待线程以进行线程调度。

你的例子的解释。

    您已将 latch 变量的计数设置为 3

    CountDownLatch latch = new CountDownLatch(3);
    

    您已将此共享latch 传递给工作线程:Processor

    Processor 的三个Runnable 实例已提交给ExecutorService executor

    主线程(App)正在等待计数变为零,下面的语句

     latch.await();  
    
    Processor 线程休眠 3 秒,然后使用 latch.countDown() 递减计数值

    由于latch.countDown(),第一个Process 实例将在完成后将锁存计数更改为2。

    由于latch.countDown(),第二个Process 实例将在完成后将锁存计数更改为1。

    由于latch.countDown(),第三个Process 实例将在完成后将锁存计数更改为0。

    锁存器计数为零导致主线程Appawait出来

    应用程序现在打印此输出:Completed

【讨论】:

【参考方案10】:

Java Doc 的这个例子帮助我清楚地理解了这些概念:

class Driver  // ...
  void main() throws InterruptedException 
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(N);

    for (int i = 0; i < N; ++i) // create and start threads
      new Thread(new Worker(startSignal, doneSignal)).start();

    doSomethingElse();            // don't let run yet
    startSignal.countDown();      // let all threads proceed
    doSomethingElse();
    doneSignal.await();           // wait for all to finish
  


class Worker implements Runnable 
  private final CountDownLatch startSignal;
  private final CountDownLatch doneSignal;
  Worker(CountDownLatch startSignal, CountDownLatch doneSignal) 
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
  
  public void run() 
     try 
       startSignal.await();
       doWork();
       doneSignal.countDown();
      catch (InterruptedException ex)  // return;
  

  void doWork()  ... 

视觉解读:

显然,CountDownLatch 允许一个线程(此处为 Driver)等到一堆正在运行的线程(此处为 Worker)执行完毕。

【讨论】:

【参考方案11】:
package practice;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample 

    public static void main(String[] args) throws InterruptedException 
        CountDownLatch c= new CountDownLatch(3);  // need to decrements the count (3) to zero by calling countDown() method so that main thread will wake up after calling await() method 
        Task t = new Task(c);
        Task t1 = new Task(c);
        Task t2 = new Task(c);
        t.start();
        t1.start();
        t2.start();
        c.await(); // when count becomes zero main thread will wake up 
        System.out.println("This will print after count down latch count become zero");
    


class Task extends Thread
    CountDownLatch c;

    public Task(CountDownLatch c) 
        this.c = c;
    

    @Override
    public void run() 
        try 
            System.out.println(Thread.currentThread().getName());
            Thread.sleep(1000);
            c.countDown();   // each thread decrement the count by one 
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

【讨论】:

【参考方案12】:

此链接CountDownLatchExample中解释的countDownLatch的最佳实时示例

【讨论】:

【参考方案13】:

最好的选择是CyclicBarrier,根据https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html 见:

CountDownLatch 使用给定的计数进行初始化。由于调用了 countDown() 方法,等待方法一直阻塞,直到当前计数达到零,之后所有等待的线程都被释放,任何后续的 await 调用立即返回。这是一次性现象——计数无法重置。如果您需要重置计数的版本,请考虑使用 CyclicBarrier。

【讨论】:

以上是关于Java多线程中如何使用CountDownLatch?的主要内容,如果未能解决你的问题,请参考以下文章

关于CountDownLatch控制线程的执行顺序

11.2-全栈Java笔记:Java中如何实现多线程

如何使用 Java 在多线程环境中测试某些东西 [重复]

Java多线程中如何使用CountDownLatch?

如何限制Java程序中的多线程?

如何解决java 多线程问题