如何等待多个线程完成?

Posted

技术标签:

【中文标题】如何等待多个线程完成?【英文标题】:How to wait for a number of threads to complete? 【发布时间】:2010-11-18 03:22:57 【问题描述】:

有什么方法可以简单地等待所有线程进程完成?例如,假设我有:

public class DoSomethingInAThread implements Runnable

    public static void main(String[] args) 
        for (int n=0; n<1000; n++) 
            Thread t = new Thread(new DoSomethingInAThread());
            t.start();
        
        // wait for all threads' run() methods to complete before continuing
    

    public void run() 
        // do something here
    



如何更改此设置,以便 main() 方法在注释处暂停,直到所有线程的 run() 方法退出?谢谢!

【问题讨论】:

【参考方案1】:

问题:

for(i = 0; i < threads.length; i++)
  threads[i].join();

...是,threads[i + 1] 永远不能在threads[i] 之前加入。 除了“闩锁”的解决方案之外,所有解决方案都缺乏这个。

这里(还没有)提到ExecutorCompletionService,它允许根据完成顺序加入线程/任务:

public class ExecutorCompletionService&lt;V&gt; extends Object implements CompletionService&lt;V&gt;

使用提供的Executor 执行任务的CompletionService。此类安排提交的任务在完成后放置在使用 take 可访问的队列中。该类足够轻量级,适合在处理任务组时临时使用。

用法示例。

假设您有一组求解某个问题的求解器,每个求解器返回某种类型的值 Result,并希望同时运行它们,以某种方法处理每个返回非空值的结果use(Result r)。你可以这样写:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException 
  CompletionService<Result> cs = new ExecutorCompletionService<>(e);
  solvers.forEach(cs::submit);
  for (int i = solvers.size(); i > 0; i--) 
    Result r = cs.take().get();
    if (r != null)
      use(r);
  

假设您想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException 
  CompletionService<Result> cs = new ExecutorCompletionService<>(e);
  int n = solvers.size();
  List<Future<Result>> futures = new ArrayList<>(n);
  Result result = null;
  try 
    solvers.forEach(solver -> futures.add(cs.submit(solver)));
    for (int i = n; i > 0; i--) 
      try 
        Result r = cs.take().get();
        if (r != null) 
          result = r;
          break;
        
       catch (ExecutionException ignore) 
    
   finally 
    futures.forEach(future -> future.cancel(true));
  

  if (result != null)
    use(result);

自:1.5 (!)

假设use(r)(示例 1)也是异步的,我们有很大的优势。 #

【讨论】:

【参考方案2】:

join() 对我没有帮助。在 Kotlin 中查看此示例:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable 
        for (i in 1..5) 
            val t = Thread(Runnable 
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) 
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            )
            t.start()
        
    )

结果:

Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765

现在让我将join() 用于线程:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable 
        for (i in 1..5) 
            val t = Thread(Runnable 
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) 
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            )
            t.start()
            t.join()
        
    )

结果:

Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833

当我们使用join 时很明显:

    线程按顺序运行。 第一个样本需要 765 毫秒,而第二个样本需要 3833 毫秒。

我们防止阻塞其他线程的解决方案是创建一个 ArrayList:

val threads = ArrayList<Thread>()

现在,当我们想要启动一个新线程时,我们最常将其添加到 ArrayList 中:

addThreadToArray(
    ThreadUtils.startNewThread(Runnable 
        ...
    )
)

addThreadToArray 函数:

@Synchronized
fun addThreadToArray(th: Thread) 
    threads.add(th)

startNewThread 功能:

fun startNewThread(runnable: Runnable) : Thread 
    val th = Thread(runnable)
    th.isDaemon = false
    th.priority = Thread.MAX_PRIORITY
    th.start()
    return th

检查线程的完成情况,如下所示:

val notAliveThreads = ArrayList<Thread>()
for (t in threads)
    if (!t.isAlive)
        notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0)
    // The size is 0 -> there is no alive threads.

【讨论】:

【参考方案3】:

作为 CountDownLatch 的替代品,您还可以使用 CyclicBarrier 例如

public class ThreadWaitEx 
    static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable()
        public void run()
            System.out.println("clean up job after all tasks are done.");
        
    );
    public static void main(String[] args) 
        for (int i = 0; i < 100; i++) 
            Thread t = new Thread(new MyCallable(barrier));
            t.start();
               
    

    

class MyCallable implements Runnable
    private CyclicBarrier b = null;
    public MyCallable(CyclicBarrier b)
        this.b = b;
    
    @Override
    public void run()
        try 
            //do something
            System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
            b.await();
         catch (InterruptedException e) 
            e.printStackTrace();
         catch (BrokenBarrierException e) 
            e.printStackTrace();
        
           

在这种情况下使用 CyclicBarrier,barrier.await() 应该是最后一个语句,即当你的线程完成它的工作时。 CyclicBarrier 可以通过其 reset() 方法再次使用。引用 javadocs:

CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,在队伍中的最后一个线程到达之后,但在任何线程被释放之前。此屏障操作对于在任何一方继续之前更新共享状态很有用。

【讨论】:

我认为这不是 CyclicBarrier 的好例子。为什么要使用 Thread.sleep() 调用? @Guenther - 是的,我更改了代码以满足要求。 CyclicBarrier 不是 CountDownLatch 的替代品。当线程必须重复倒计时时,您应该创建一个 CyclicBarrier,否则默认为 CountDownLatch(除非需要额外的 Execution 抽象,此时您应该查看更高级别的 Services)。【参考方案4】:

您可以使用对象"ThreadGroup" and its parameter activeCount:

【讨论】:

不确定你到底打算怎么做。如果您建议在循环中轮询 activeCount:这很糟糕,因为它是忙碌等待(即使您在轮询之间睡觉 - 您也会在业务和响应能力之间进行权衡)。 @Martin v. Löwis:“加入将只等待一个线程。更好的解决方案可能是 java.util.concurrent.CountDownLatch。只需将锁存器的计数设置为工作线程。每个工作线程在退出之前都应该调用countDown(),而主线程只是调用await(),这将阻塞直到计数器达到零。join()的问题也是你不能开始添加更多线程动态。列表将随着并发修改而爆炸。您的解决方案适用于问题,但不适用于一般目的。【参考方案5】:

正如 Martin K 建议的那样,java.util.concurrent.CountDownLatch 似乎是一个更好的解决方案。只需添加一个相同的示例

     public class CountDownLatchDemo


    public static void main (String[] args)
    
        int noOfThreads = 5;
        // Declare the count down latch based on the number of threads you need
        // to wait on
        final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
        for (int i = 0; i < noOfThreads; i++)
        
            new Thread()
            

                @Override
                public void run ()
                

                    System.out.println("I am executed by :" + Thread.currentThread().getName());
                    try
                    
                        // Dummy sleep
                        Thread.sleep(3000);
                        // One thread has completed its job
                        executionCompleted.countDown();
                    
                    catch (InterruptedException e)
                    
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    
                

            .start();
        

        try
        
            // Wait till the count down latch opens.In the given case till five
            // times countDown method is invoked
            executionCompleted.await();
            System.out.println("All over");
        
        catch (InterruptedException e)
        
            e.printStackTrace();
        
    


【讨论】:

【参考方案6】:

如果你创建一个线程列表,你可以遍历它们并针对每个线程使用 .join() ,当所有线程都有时,你的循环就会结束。不过我没试过。

http://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#join()

【讨论】:

嗨,由于某种原因,它对我不起作用。这是我的问题:***.com/users/5144855/ruchir-baronia【参考方案7】:

一种方法是创建Threads 的List,创建并启动每个线程,同时将其添加到列表中。启动所有内容后,循环返回列表并在每个列表上调用join()。线程完成执行的顺序无关紧要,您需要知道的是,当第二个循环完成执行时,每个线程都将完成。

更好的方法是使用ExecutorService 及其相关方法:

List<Callable> callables = ... // assemble list of Callables here
                               // Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
//       bother saving them

使用 ExecutorService(以及来自 Java 5 的 concurrency utilities 的所有新东西)非常灵活,上面的示例几乎没有触及表面。

【讨论】:

线程组是要走的路!使用可变列表你会遇到麻烦(同步) 什么?你怎么会惹上麻烦?它仅由正在启动的线程可变(仅可读),因此只要它不修改列表同时遍历它,就可以了。 这取决于你如何使用它。如果你在线程中使用调用类,你会遇到问题。【参考方案8】:

您可以使用CountDownLatch 而不是join(),这是一个旧的API。我已将您的代码修改如下以满足您的要求。

import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch)
        this.latch = latch;
     
    public void run() 
        try
            System.out.println("Do some thing");
            latch.countDown();
        catch(Exception err)
            err.printStackTrace();
        
    


public class CountDownLatchDemo 
    public static void main(String[] args) 
        try
            CountDownLatch latch = new CountDownLatch(1000);
            for (int n=0; n<1000; n++) 
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            
            latch.await();
            System.out.println("In Main thread after completion of 1000 threads");
        catch(Exception err)
            err.printStackTrace();
        
    

解释

    CountDownLatch 已根据您的要求使用给定计数 1000 进行初始化。

    每个工作线程DoSomethingInAThread都会递减CountDownLatch,它已经传入构造函数。

    主线程 CountDownLatchDemo await() 直到计数变为零。一旦计数变为零,您将得到低于线的输出。

    In Main thread after completion of 1000 threads
    

来自 oracle 文档页面的更多信息

public void await()
           throws InterruptedException

导致当前线程等待直到锁存器倒计时到零,除非线程被中断。

有关其他选项,请参阅相关的 SE 问题:

wait until all threads finish their work in java

【讨论】:

【参考方案9】:

考虑使用java.util.concurrent.CountDownLatch。 javadocs中的示例

【讨论】:

是线程的锁存器,锁存器锁与倒计时一起工作。在线程的 run() 方法中明确声明等待 CountDownLatch 达到它的倒计时到 0。您可以在多个线程中使用相同的 CountDownLatch 来同时释放它们。不知道是不是你需要的,只想提一下,因为在多线程环境下工作时很有用。 也许你应该把这个解释放在你的答案正文中? Javadoc 中的示例非常具有描述性,因此我没有添加任何示例。 docs.oracle.com/javase/7/docs/api/java/util/concurrent/…。在第一个示例中,所有 Workers 线程同时释放,因为它们等待 CountdownLatch startSignal 达到零,这发生在 startSignal.countDown() 中。然后,mian 线程使用 doneSignal.await() 指令等待所有工作完成。 doneSignal 在每个工作人员中降低其值。【参考方案10】:

在第一个 for 循环中创建线程对象。

for (int i = 0; i < threads.length; i++) 
     threads[i] = new Thread(new Runnable() 
         public void run() 
             // some code to run in parallel
         
     );
     threads[i].start();
 

然后这里的每个人都在说什么。

for(i = 0; i < threads.length; i++)
  threads[i].join();

【讨论】:

【参考方案11】:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DoSomethingInAThread implements Runnable

   public static void main(String[] args) throws ExecutionException, InterruptedException
   
      //limit the number of actual threads
      int poolSize = 10;
      ExecutorService service = Executors.newFixedThreadPool(poolSize);
      List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();

      for (int n = 0; n < 1000; n++)
      
         Future f = service.submit(new DoSomethingInAThread());
         futures.add(f);
      

      // wait for all tasks to complete before continuing
      for (Future<Runnable> f : futures)
      
         f.get();
      

      //shut down the executor service so that this thread can exit
      service.shutdownNow();
   

   public void run()
   
      // do something here
   

【讨论】:

工作就像一个魅力......我有两组线程,由于多个 cookie 的问题,它们不应该同时运行。我使用您的示例一次运行一组线程。感谢您分享您的知识... @Dantalian - 在您的 Runnable 类中(可能在 run 方法中),您可能希望捕获发生的任何异常并将它们存储在本地(或存储错误消息/条件)。在示例中,f.get() 返回您提交给 ExecutorService 的对象。您的对象可能有一个检索任何异常/错误条件的方法。根据您修改提供的示例的方式,您可能需要将 f.get() 转换的对象转换为您的预期类型。【参考方案12】:

根据您的需要,您可能还想查看 java.util.concurrent 包中的 CountDownLatch 和 CyclicBarrier 类。如果您希望您的线程相互等待,或者您希望对线程的执行方式进行更细粒度的控制(例如,在它们的内部执行中等待另一个线程设置某些状态),它们会很有用。您还可以使用 CountDownLatch 来通知所有线程同时启动,而不是在遍历循环时一个接一个地启动它们。标准 API 文档有一个这样的例子,加上使用另一个 CountDownLatch 来等待所有线程完成它们的执行。

【讨论】:

【参考方案13】:

完全避免使用 Thread 类,而是使用 java.util.concurrent 中提供的更高抽象

ExecutorService 类提供的method invokeAll 似乎可以满足您的需求。

【讨论】:

【参考方案14】:

你把所有线程放在一个数组中,全部启动,然后有一个循环

for(i = 0; i < threads.length; i++)
  threads[i].join();

每个连接都会阻塞,直到各自的线程完成。线程的完成顺序可能与您加入它们的顺序不同,但这不是问题:当循环退出时,所有线程都已完成。

【讨论】:

@Mykola:究竟使用线程组的优势是什么?仅仅因为 API 存在并不意味着您必须使用它... 参见:“一个线程组代表一组线程。”对于这个用例,这是语义正确的!并且:“一个线程被允许访问关于它自己的线程组的信息” 《Effective Java》一书建议避免使用线程组(第 73 项)。 Effective Java 中提到的 bug 应该在 Java 6 中已经修复。如果 Java 新版本不是限制,最好使用 Futures 来解决线程问题。 Martin v. Löwis:你是对的。这与该问题无关,但很高兴从一个对象(如 ExecutorService)获得有关正在运行的线程的更多信息。我认为使用给定的特性来解决问题很好;也许您将来需要更多的灵活性(线程信息)。提及旧 JDK 中的旧错误类也是正确的。 ThreadGroup 没有实现组级join,所以人们为什么要推动ThreadGroup 有点莫名其妙。人们真的在使用自旋锁并查询组的 activeCount 吗?与仅在所有线程上调用 join 相比,您很难说服我这样做更好。

以上是关于如何等待多个线程完成?的主要内容,如果未能解决你的问题,请参考以下文章

创建多个线程并等待所有线程完成

Kotlin Coroutines:等待多个线程完成

如何等待多个服务完成?

主程序如何通过等待而不是加入同时等待多个线程?

如何等待多个异步功能完成?

pthread_join - 多个线程等待