如何等待多个线程完成?
Posted
技术标签:
【中文标题】如何等待多个线程完成?【英文标题】:How to wait for a number of threads to complete? 【发布时间】:2010-11-18 03:22:57 【问题描述】:有什么方法可以简单地等待所有线程进程完成?例如,假设我有:
public class DoSomethingInAThread implements Runnable
public static void main(String[] args)
for (int n=0; n<1000; n++)
Thread t = new Thread(new DoSomethingInAThread());
t.start();
// wait for all threads' run() methods to complete before continuing
public void run()
// do something here
如何更改此设置,以便 main()
方法在注释处暂停,直到所有线程的 run()
方法退出?谢谢!
【问题讨论】:
【参考方案1】:问题:
for(i = 0; i < threads.length; i++)
threads[i].join();
...是,threads[i + 1]
永远不能在threads[i]
之前加入。
除了“闩锁”的解决方案之外,所有解决方案都缺乏这个。
这里(还没有)提到ExecutorCompletionService,它允许根据完成顺序加入线程/任务:
public class ExecutorCompletionService<V>
extends Object
implements CompletionService<V>
使用提供的
Executor
执行任务的CompletionService
。此类安排提交的任务在完成后放置在使用 take 可访问的队列中。该类足够轻量级,适合在处理任务组时临时使用。用法示例。
假设您有一组求解某个问题的求解器,每个求解器返回某种类型的值 Result,并希望同时运行它们,以某种方法处理每个返回非空值的结果
use(Result r)
。你可以这样写:void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException CompletionService<Result> cs = new ExecutorCompletionService<>(e); solvers.forEach(cs::submit); for (int i = solvers.size(); i > 0; i--) Result r = cs.take().get(); if (r != null) use(r);
假设您想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务:
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException CompletionService<Result> cs = new ExecutorCompletionService<>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<>(n); Result result = null; try solvers.forEach(solver -> futures.add(cs.submit(solver))); for (int i = n; i > 0; i--) try Result r = cs.take().get(); if (r != null) result = r; break; catch (ExecutionException ignore) finally futures.forEach(future -> future.cancel(true)); if (result != null) use(result);
自:1.5 (!)
假设use(r)
(示例 1)也是异步的,我们有很大的优势。 #
【讨论】:
【参考方案2】:join()
对我没有帮助。在 Kotlin 中查看此示例:
val timeInMillis = System.currentTimeMillis()
ThreadUtils.startNewThread(Runnable
for (i in 1..5)
val t = Thread(Runnable
Thread.sleep(50)
var a = i
kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
Thread.sleep(200)
for (j in 1..5)
a *= j
Thread.sleep(100)
kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
)
t.start()
)
结果:
Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765
现在让我将join()
用于线程:
val timeInMillis = System.currentTimeMillis()
ThreadUtils.startNewThread(Runnable
for (i in 1..5)
val t = Thread(Runnable
Thread.sleep(50)
var a = i
kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
Thread.sleep(200)
for (j in 1..5)
a *= j
Thread.sleep(100)
kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
)
t.start()
t.join()
)
结果:
Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833
当我们使用join
时很明显:
-
线程按顺序运行。
第一个样本需要 765 毫秒,而第二个样本需要 3833 毫秒。
我们防止阻塞其他线程的解决方案是创建一个 ArrayList:
val threads = ArrayList<Thread>()
现在,当我们想要启动一个新线程时,我们最常将其添加到 ArrayList 中:
addThreadToArray(
ThreadUtils.startNewThread(Runnable
...
)
)
addThreadToArray
函数:
@Synchronized
fun addThreadToArray(th: Thread)
threads.add(th)
startNewThread
功能:
fun startNewThread(runnable: Runnable) : Thread
val th = Thread(runnable)
th.isDaemon = false
th.priority = Thread.MAX_PRIORITY
th.start()
return th
检查线程的完成情况,如下所示:
val notAliveThreads = ArrayList<Thread>()
for (t in threads)
if (!t.isAlive)
notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0)
// The size is 0 -> there is no alive threads.
【讨论】:
【参考方案3】:作为 CountDownLatch 的替代品,您还可以使用 CyclicBarrier 例如
public class ThreadWaitEx
static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable()
public void run()
System.out.println("clean up job after all tasks are done.");
);
public static void main(String[] args)
for (int i = 0; i < 100; i++)
Thread t = new Thread(new MyCallable(barrier));
t.start();
class MyCallable implements Runnable
private CyclicBarrier b = null;
public MyCallable(CyclicBarrier b)
this.b = b;
@Override
public void run()
try
//do something
System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
b.await();
catch (InterruptedException e)
e.printStackTrace();
catch (BrokenBarrierException e)
e.printStackTrace();
在这种情况下使用 CyclicBarrier,barrier.await() 应该是最后一个语句,即当你的线程完成它的工作时。 CyclicBarrier 可以通过其 reset() 方法再次使用。引用 javadocs:
CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,在队伍中的最后一个线程到达之后,但在任何线程被释放之前。此屏障操作对于在任何一方继续之前更新共享状态很有用。
【讨论】:
我认为这不是 CyclicBarrier 的好例子。为什么要使用 Thread.sleep() 调用? @Guenther - 是的,我更改了代码以满足要求。 CyclicBarrier 不是 CountDownLatch 的替代品。当线程必须重复倒计时时,您应该创建一个 CyclicBarrier,否则默认为 CountDownLatch(除非需要额外的 Execution 抽象,此时您应该查看更高级别的 Services)。【参考方案4】:您可以使用对象"ThreadGroup" and its parameter activeCount:
【讨论】:
不确定你到底打算怎么做。如果您建议在循环中轮询 activeCount:这很糟糕,因为它是忙碌等待(即使您在轮询之间睡觉 - 您也会在业务和响应能力之间进行权衡)。 @Martin v. Löwis:“加入将只等待一个线程。更好的解决方案可能是 java.util.concurrent.CountDownLatch。只需将锁存器的计数设置为工作线程。每个工作线程在退出之前都应该调用countDown(),而主线程只是调用await(),这将阻塞直到计数器达到零。join()的问题也是你不能开始添加更多线程动态。列表将随着并发修改而爆炸。您的解决方案适用于问题,但不适用于一般目的。【参考方案5】:正如 Martin K 建议的那样,java.util.concurrent.CountDownLatch
似乎是一个更好的解决方案。只需添加一个相同的示例
public class CountDownLatchDemo
public static void main (String[] args)
int noOfThreads = 5;
// Declare the count down latch based on the number of threads you need
// to wait on
final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
for (int i = 0; i < noOfThreads; i++)
new Thread()
@Override
public void run ()
System.out.println("I am executed by :" + Thread.currentThread().getName());
try
// Dummy sleep
Thread.sleep(3000);
// One thread has completed its job
executionCompleted.countDown();
catch (InterruptedException e)
// TODO Auto-generated catch block
e.printStackTrace();
.start();
try
// Wait till the count down latch opens.In the given case till five
// times countDown method is invoked
executionCompleted.await();
System.out.println("All over");
catch (InterruptedException e)
e.printStackTrace();
【讨论】:
【参考方案6】:如果你创建一个线程列表,你可以遍历它们并针对每个线程使用 .join() ,当所有线程都有时,你的循环就会结束。不过我没试过。
http://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#join()
【讨论】:
嗨,由于某种原因,它对我不起作用。这是我的问题:***.com/users/5144855/ruchir-baronia【参考方案7】:一种方法是创建Thread
s 的List
,创建并启动每个线程,同时将其添加到列表中。启动所有内容后,循环返回列表并在每个列表上调用join()
。线程完成执行的顺序无关紧要,您需要知道的是,当第二个循环完成执行时,每个线程都将完成。
更好的方法是使用ExecutorService 及其相关方法:
List<Callable> callables = ... // assemble list of Callables here
// Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
// bother saving them
使用 ExecutorService(以及来自 Java 5 的 concurrency utilities 的所有新东西)非常灵活,上面的示例几乎没有触及表面。
【讨论】:
线程组是要走的路!使用可变列表你会遇到麻烦(同步) 什么?你怎么会惹上麻烦?它仅由正在启动的线程可变(仅可读),因此只要它不修改列表同时遍历它,就可以了。 这取决于你如何使用它。如果你在线程中使用调用类,你会遇到问题。【参考方案8】:您可以使用CountDownLatch 而不是join()
,这是一个旧的API。我已将您的代码修改如下以满足您的要求。
import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable
CountDownLatch latch;
public DoSomethingInAThread(CountDownLatch latch)
this.latch = latch;
public void run()
try
System.out.println("Do some thing");
latch.countDown();
catch(Exception err)
err.printStackTrace();
public class CountDownLatchDemo
public static void main(String[] args)
try
CountDownLatch latch = new CountDownLatch(1000);
for (int n=0; n<1000; n++)
Thread t = new Thread(new DoSomethingInAThread(latch));
t.start();
latch.await();
System.out.println("In Main thread after completion of 1000 threads");
catch(Exception err)
err.printStackTrace();
解释:
CountDownLatch
已根据您的要求使用给定计数 1000 进行初始化。
每个工作线程DoSomethingInAThread
都会递减CountDownLatch
,它已经传入构造函数。
主线程 CountDownLatchDemo
await()
直到计数变为零。一旦计数变为零,您将得到低于线的输出。
In Main thread after completion of 1000 threads
来自 oracle 文档页面的更多信息
public void await()
throws InterruptedException
导致当前线程等待直到锁存器倒计时到零,除非线程被中断。
有关其他选项,请参阅相关的 SE 问题:
wait until all threads finish their work in java
【讨论】:
【参考方案9】:考虑使用java.util.concurrent.CountDownLatch
。 javadocs中的示例
【讨论】:
是线程的锁存器,锁存器锁与倒计时一起工作。在线程的 run() 方法中明确声明等待 CountDownLatch 达到它的倒计时到 0。您可以在多个线程中使用相同的 CountDownLatch 来同时释放它们。不知道是不是你需要的,只想提一下,因为在多线程环境下工作时很有用。 也许你应该把这个解释放在你的答案正文中? Javadoc 中的示例非常具有描述性,因此我没有添加任何示例。 docs.oracle.com/javase/7/docs/api/java/util/concurrent/…。在第一个示例中,所有 Workers 线程同时释放,因为它们等待 CountdownLatch startSignal 达到零,这发生在 startSignal.countDown() 中。然后,mian 线程使用 doneSignal.await() 指令等待所有工作完成。 doneSignal 在每个工作人员中降低其值。【参考方案10】:在第一个 for 循环中创建线程对象。
for (int i = 0; i < threads.length; i++)
threads[i] = new Thread(new Runnable()
public void run()
// some code to run in parallel
);
threads[i].start();
然后这里的每个人都在说什么。
for(i = 0; i < threads.length; i++)
threads[i].join();
【讨论】:
【参考方案11】:import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class DoSomethingInAThread implements Runnable
public static void main(String[] args) throws ExecutionException, InterruptedException
//limit the number of actual threads
int poolSize = 10;
ExecutorService service = Executors.newFixedThreadPool(poolSize);
List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();
for (int n = 0; n < 1000; n++)
Future f = service.submit(new DoSomethingInAThread());
futures.add(f);
// wait for all tasks to complete before continuing
for (Future<Runnable> f : futures)
f.get();
//shut down the executor service so that this thread can exit
service.shutdownNow();
public void run()
// do something here
【讨论】:
工作就像一个魅力......我有两组线程,由于多个 cookie 的问题,它们不应该同时运行。我使用您的示例一次运行一组线程。感谢您分享您的知识... @Dantalian - 在您的 Runnable 类中(可能在 run 方法中),您可能希望捕获发生的任何异常并将它们存储在本地(或存储错误消息/条件)。在示例中,f.get() 返回您提交给 ExecutorService 的对象。您的对象可能有一个检索任何异常/错误条件的方法。根据您修改提供的示例的方式,您可能需要将 f.get() 转换的对象转换为您的预期类型。【参考方案12】:根据您的需要,您可能还想查看 java.util.concurrent 包中的 CountDownLatch 和 CyclicBarrier 类。如果您希望您的线程相互等待,或者您希望对线程的执行方式进行更细粒度的控制(例如,在它们的内部执行中等待另一个线程设置某些状态),它们会很有用。您还可以使用 CountDownLatch 来通知所有线程同时启动,而不是在遍历循环时一个接一个地启动它们。标准 API 文档有一个这样的例子,加上使用另一个 CountDownLatch 来等待所有线程完成它们的执行。
【讨论】:
【参考方案13】:完全避免使用 Thread 类,而是使用 java.util.concurrent 中提供的更高抽象
ExecutorService 类提供的method invokeAll 似乎可以满足您的需求。
【讨论】:
【参考方案14】:你把所有线程放在一个数组中,全部启动,然后有一个循环
for(i = 0; i < threads.length; i++)
threads[i].join();
每个连接都会阻塞,直到各自的线程完成。线程的完成顺序可能与您加入它们的顺序不同,但这不是问题:当循环退出时,所有线程都已完成。
【讨论】:
@Mykola:究竟使用线程组的优势是什么?仅仅因为 API 存在并不意味着您必须使用它... 参见:“一个线程组代表一组线程。”对于这个用例,这是语义正确的!并且:“一个线程被允许访问关于它自己的线程组的信息” 《Effective Java》一书建议避免使用线程组(第 73 项)。 Effective Java 中提到的 bug 应该在 Java 6 中已经修复。如果 Java 新版本不是限制,最好使用 Futures 来解决线程问题。 Martin v. Löwis:你是对的。这与该问题无关,但很高兴从一个对象(如 ExecutorService)获得有关正在运行的线程的更多信息。我认为使用给定的特性来解决问题很好;也许您将来需要更多的灵活性(线程信息)。提及旧 JDK 中的旧错误类也是正确的。 ThreadGroup 没有实现组级join,所以人们为什么要推动ThreadGroup 有点莫名其妙。人们真的在使用自旋锁并查询组的 activeCount 吗?与仅在所有线程上调用 join 相比,您很难说服我这样做更好。以上是关于如何等待多个线程完成?的主要内容,如果未能解决你的问题,请参考以下文章