Java多线程中如何使用CountDownLatch?
Posted
技术标签:
【中文标题】Java多线程中如何使用CountDownLatch?【英文标题】:How is CountDownLatch used in Java Multithreading? 【发布时间】:2013-07-23 12:23:40 【问题描述】:谁能帮我理解 Java CountDownLatch
是什么以及何时使用它?
我对这个程序的工作原理不是很清楚。据我了解,所有三个线程同时启动,每个线程将在 3000 毫秒后调用 CountDownLatch。所以倒计时会一一递减。锁存器变为零后,程序会打印“已完成”。也许我理解的方式是不正确的。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class Processor implements Runnable
private CountDownLatch latch;
public Processor(CountDownLatch latch)
this.latch = latch;
public void run()
System.out.println("Started.");
try
Thread.sleep(3000);
catch (InterruptedException e)
e.printStackTrace();
latch.countDown();
// --------------------------------------------- --------
public class App
public static void main(String[] args)
CountDownLatch latch = new CountDownLatch(3); // coundown from 3 to 0
ExecutorService executor = Executors.newFixedThreadPool(3); // 3 Threads in pool
for(int i=0; i < 3; i++)
executor.submit(new Processor(latch)); // ref to latch. each time call new Processes latch will count down by 1
try
latch.await(); // wait until latch counted down to 0
catch (InterruptedException e)
e.printStackTrace();
System.out.println("Completed.");
【问题讨论】:
docs.oracle.com/javase/7/docs/api/java/util/concurrent/… 我刚刚将您的问题示例代码用于 android 并行服务批处理,它就像一个魅力。非常感谢! 从 2012 年的 this video 获得这里,这与此处显示的示例非常相似。对于任何感兴趣的人,这是来自一个名叫 John 的人的 Java 多线程教程系列的一部分。我喜欢约翰。强烈推荐。 【参考方案1】:是的,你理解正确。
CountDownLatch
工作原理是闩锁,主线程会一直等到门打开。一个线程等待 n 个线程,在创建 CountDownLatch
时指定。
任何调用CountDownLatch.await()
的线程,通常是应用程序的主线程,都会等待计数达到零或被另一个线程中断。一旦完成或准备好,所有其他线程都需要通过调用CountDownLatch.countDown()
来倒计时。
一旦计数达到零,等待线程继续。 CountDownLatch
的缺点/优点之一是它不可重复使用:一旦计数达到零,您就不能再使用 CountDownLatch
。
编辑:
当一个线程(如主线程)需要等待一个或多个线程完成才能继续处理时,使用CountDownLatch
。
在 Java 中使用CountDownLatch
的经典示例是使用服务架构的服务器端核心 Java 应用程序,其中多个服务由多个线程提供,并且在所有服务都成功启动之前应用程序无法开始处理。
附: OP 的问题有一个非常简单的例子,所以我没有包括一个。
【讨论】:
感谢您的回复。你能给我一个应用倒计时闩锁的例子吗? CountDownLatch的使用教程在这里howtodoinjava.com/2013/07/18/… @NikolaB 但是在这个给定的例子中,我们可以通过使用 join 方法获得相同的结果,不是吗? 我认为不可重用性是一个优势:您确定没有人可以重置它或增加计数。 很好的解释。但我对One thread waits for n number of threads specified while creating CountDownLatch in Java
这一点略有不同意见。如果你需要这样的机制,那么谨慎使用CyclicBarrier
。 Java concurrency in Practice
中给出的这两者之间的基本概念区别是:Latches are for waiting for events; barriers are for waiting for other threads
。 cyclicBarrier.await()
进入阻塞状态。【参考方案2】:
Java 中的CountDownLatch
是一种同步器,它允许一个Thread
在开始处理之前等待一个或多个Thread
s。
CountDownLatch
的工作原理是闩锁,线程将等待直到门打开。一个线程等待n
创建CountDownLatch
时指定的线程数。
例如final CountDownLatch latch = new CountDownLatch(3);
这里我们将计数器设置为 3。
调用CountDownLatch.await()
的任何线程,通常是应用程序的主线程,将等待计数达到零或被另一个Thread
中断。一旦完成或准备好工作,所有其他线程都需要通过调用CountDownLatch.countDown()
进行倒计时。一旦计数达到零,Thread
等待开始运行。
这里的计数被CountDownLatch.countDown()
方法递减。
调用await()
方法的Thread
会一直等到初始计数归零。
要使计数为零,其他线程需要调用countDown()
方法。
一旦计数变为零,调用await()
方法的线程将恢复(开始执行)。
CountDownLatch
的缺点是它不可重复使用:一旦计数变为零,它就不再可用。
【讨论】:
我们是否使用new CountDownLatch(3)
,因为我们定义了来自 newFixedThreadPool
的 3 个线程?
“开始处理之前”不应该是“继续处理之前”吗?
@Arefe 是的,这是通过您的代码块的线程数【参考方案3】:
NikolaB 解释的很好,但是例子会有助于理解,所以这里是一个简单的例子...
import java.util.concurrent.*;
public class CountDownLatchExample
public static class ProcessThread implements Runnable
CountDownLatch latch;
long workDuration;
String name;
public ProcessThread(String name, CountDownLatch latch, long duration)
this.name= name;
this.latch = latch;
this.workDuration = duration;
public void run()
try
System.out.println(name +" Processing Something for "+ workDuration/1000 + " Seconds");
Thread.sleep(workDuration);
catch (InterruptedException e)
e.printStackTrace();
System.out.println(name+ "completed its works");
//when task finished.. count down the latch count...
// basically this is same as calling lock object notify(), and object here is latch
latch.countDown();
public static void main(String[] args)
// Parent thread creating a latch object
CountDownLatch latch = new CountDownLatch(3);
new Thread(new ProcessThread("Worker1",latch, 2000)).start(); // time in millis.. 2 secs
new Thread(new ProcessThread("Worker2",latch, 6000)).start();//6 secs
new Thread(new ProcessThread("Worker3",latch, 4000)).start();//4 secs
System.out.println("waiting for Children processes to complete....");
try
//current thread will get notified if all chidren's are done
// and thread will resume from wait() mode.
latch.await();
catch (InterruptedException e)
e.printStackTrace();
System.out.println("All Process Completed....");
System.out.println("Parent Thread Resuming work....");
【讨论】:
【参考方案4】:当我们想要等待多个线程完成其任务时使用它。类似于线程中的join。
我们可以在哪里使用 CountDownLatch
考虑一个场景,我们有三个线程“A”、“B”和“C”,并且我们希望仅在“A”和“B”线程完成或部分完成它们的线程时启动线程“C”任务。
可应用于现实世界的 IT 场景
假设经理在开发团队(A 和 B)之间划分模块,并且他希望仅在两个团队都完成任务时将其分配给 QA 团队进行测试。
public class Manager
public static void main(String[] args) throws InterruptedException
CountDownLatch countDownLatch = new CountDownLatch(2);
MyDevTeam teamDevA = new MyDevTeam(countDownLatch, "devA");
MyDevTeam teamDevB = new MyDevTeam(countDownLatch, "devB");
teamDevA.start();
teamDevB.start();
countDownLatch.await();
MyQATeam qa = new MyQATeam();
qa.start();
class MyDevTeam extends Thread
CountDownLatch countDownLatch;
public MyDevTeam (CountDownLatch countDownLatch, String name)
super(name);
this.countDownLatch = countDownLatch;
@Override
public void run()
System.out.println("Task assigned to development team " + Thread.currentThread().getName());
try
Thread.sleep(2000);
catch (InterruptedException ex)
ex.printStackTrace();
System.out.println("Task finished by development team Thread.currentThread().getName());
this.countDownLatch.countDown();
class MyQATeam extends Thread
@Override
public void run()
System.out.println("Task assigned to QA team");
try
Thread.sleep(2000);
catch (InterruptedException ex)
ex.printStackTrace();
System.out.println("Task finished by QA team");
上述代码的输出将是:
分配给开发团队 devB 的任务
分配给开发团队 devA 的任务
开发团队 devB 完成的任务
开发团队 devA 完成的任务
分配给 QA 团队的任务
QA 团队完成的任务
这里await()方法等待countdownlatch标志变为0,countDown()方法将countdownlatch标志减1。
JOIN 的限制: 上面的例子也可以用JOIN来实现,但是JOIN不能用于两种场景:
-
当我们使用 ExecutorService 而不是 Thread 类来创建线程时。
修改上面的示例,其中经理希望在开发完成 80% 的任务后立即将代码移交给 QA 团队。这意味着 CountDownLatch 允许我们修改可用于等待另一个线程部分执行的实现。
【讨论】:
【参考方案5】:CoundDownLatch 使您可以让一个线程等待,直到所有其他线程完成执行。
伪代码可以是:
// Main thread starts
// Create CountDownLatch for N threads
// Create and start N threads
// Main thread waits on latch
// N threads completes there tasks are returns
// Main thread resume execution
【讨论】:
您可能希望从代码块中移出所有描述 最好的评论。我喜欢这些“直截了当”的 cmets,而不是理论解释。【参考方案6】:正如 JavaDoc (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html) 中提到的,CountDownLatch 是一种同步辅助,在 Java 5 中引入。这里的同步并不意味着限制对关键部分的访问。而是对不同线程的操作进行排序。 通过 CountDownLatch 实现的同步类型与 Join 类似。 假设有一个线程“M”需要等待其他工作线程“T1”、“T2”、“T3”完成其任务 在 Java 1.5 之前,可以这样做的方式是,M 运行以下代码
T1.join();
T2.join();
T3.join();
上面的代码确保线程 M 在 T1、T2、T3 完成它的工作之后恢复它的工作。 T1、T2、T3可以任意顺序完成工作。
同样可以通过 CountDownLatch 实现,其中 T1、T2、T3 和线程 M 共享同一个 CountDownLatch 对象。
“M”请求: countDownLatch.await();
其中 "T1","T2","T3" 是 countDownLatch.countdown();
join 方法的一个缺点是 M 必须知道 T1、T2、T3。如果稍后添加了新的工作线程 T4,那么 M 也必须意识到它。使用 CountDownLatch 可以避免这种情况。 执行后的动作顺序为[T1,T2,T3](T1,T2,T3的顺序可以是反正)-> [M]
【讨论】:
【参考方案7】:使用 Java Simple Serial Connector 访问串行端口是何时使用此类功能的一个很好的例子。通常,您将向端口写入一些内容,并且异步地,在另一个线程上,设备将响应 SerialPortEventListener。通常,您需要在写入端口后暂停以等待响应。手动处理这种情况下的线程锁非常棘手,但使用 Countdownlatch 很容易。在你想你可以用另一种方式做之前,小心你从未想过的比赛条件!
伪代码:
CountDownLatch latch;
void writeData()
latch = new CountDownLatch(1);
serialPort.writeBytes(sb.toString().getBytes())
try
latch.await(4, TimeUnit.SECONDS);
catch (InterruptedException e)
class SerialPortReader implements SerialPortEventListener
public void serialEvent(SerialPortEvent event)
if(event.isRXCHAR())//If data is available
byte buffer[] = serialPort.readBytes(event.getEventValue());
latch.countDown();
【讨论】:
【参考方案8】:如果您在调用latch.countDown() 之后添加一些调试,这可能会帮助您更好地理解它的行为。
latch.countDown();
System.out.println("DONE "+this.latch); // Add this debug
输出将显示 Count 正在递减。这个“计数”实际上是您启动的可运行任务(处理器对象)的数量,countDown() 未被调用,因此在调用latch.await( )。
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 2]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 1]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 0]
【讨论】:
【参考方案9】:来自关于CountDownLatch的oracle文档:
一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
CountDownLatch
使用给定的计数进行初始化。 await
方法会阻塞,直到由于调用countDown()
方法而使当前计数达到零,之后所有等待的线程都被释放,任何后续的 await 调用都会立即返回。这是一次性现象——计数无法重置。
CountDownLatch 是一种多功能同步工具,可用于多种用途。
使用计数为 1 初始化的 CountDownLatch
用作简单的开/关锁存器或门:所有调用 await 的线程在门处等待,直到它被调用 countDown() 的线程打开。
初始化为 N 的CountDownLatch
可用于使一个线程等待,直到 N 个线程完成某个动作,或者某个动作已完成 N 次。
public void await()
throws InterruptedException
导致当前线程等待直到锁存器倒计时到零,除非线程被中断。
如果当前计数为零,则此方法立即返回。
public void countDown()
减少锁存器的计数,如果计数达到零,则释放所有等待的线程。
如果当前计数大于零,则递减。如果新计数为零,则重新启用所有等待线程以进行线程调度。
你的例子的解释。
您已将 latch
变量的计数设置为 3
CountDownLatch latch = new CountDownLatch(3);
您已将此共享latch
传递给工作线程:Processor
Processor
的三个Runnable
实例已提交给ExecutorService
executor
主线程(App
)正在等待计数变为零,下面的语句
latch.await();
Processor
线程休眠 3 秒,然后使用 latch.countDown()
递减计数值
由于latch.countDown()
,第一个Process
实例将在完成后将锁存计数更改为2。
由于latch.countDown()
,第二个Process
实例将在完成后将锁存计数更改为1。
由于latch.countDown()
,第三个Process
实例将在完成后将锁存计数更改为0。
锁存器计数为零导致主线程App
从await
出来
应用程序现在打印此输出:Completed
【讨论】:
【参考方案10】:Java Doc 的这个例子帮助我清楚地理解了这些概念:
class Driver // ...
void main() throws InterruptedException
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
class Worker implements Runnable
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal)
this.startSignal = startSignal;
this.doneSignal = doneSignal;
public void run()
try
startSignal.await();
doWork();
doneSignal.countDown();
catch (InterruptedException ex) // return;
void doWork() ...
视觉解读:
显然,CountDownLatch
允许一个线程(此处为 Driver
)等到一堆正在运行的线程(此处为 Worker
)执行完毕。
【讨论】:
【参考方案11】:package practice;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample
public static void main(String[] args) throws InterruptedException
CountDownLatch c= new CountDownLatch(3); // need to decrements the count (3) to zero by calling countDown() method so that main thread will wake up after calling await() method
Task t = new Task(c);
Task t1 = new Task(c);
Task t2 = new Task(c);
t.start();
t1.start();
t2.start();
c.await(); // when count becomes zero main thread will wake up
System.out.println("This will print after count down latch count become zero");
class Task extends Thread
CountDownLatch c;
public Task(CountDownLatch c)
this.c = c;
@Override
public void run()
try
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
c.countDown(); // each thread decrement the count by one
catch (InterruptedException e)
e.printStackTrace();
【讨论】:
【参考方案12】:此链接CountDownLatchExample中解释的countDownLatch的最佳实时示例
【讨论】:
【参考方案13】:最好的选择是CyclicBarrier
,根据https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
见:
CountDownLatch 使用给定的计数进行初始化。由于调用了 countDown() 方法,等待方法一直阻塞,直到当前计数达到零,之后所有等待的线程都被释放,任何后续的 await 调用立即返回。这是一次性现象——计数无法重置。如果您需要重置计数的版本,请考虑使用 CyclicBarrier。
【讨论】:
以上是关于Java多线程中如何使用CountDownLatch?的主要内容,如果未能解决你的问题,请参考以下文章