如何在 Java 中等待多个任务完成?

Posted

技术标签:

【中文标题】如何在 Java 中等待多个任务完成?【英文标题】:How to wait for completion of multiple tasks in Java? 【发布时间】:2016-04-15 11:00:24 【问题描述】:

在 Java 应用程序中实现并发的正确方法是什么?我知道线程之类的东西,当然,我已经为 Java 编程 10 年了,但在并发方面没有太多经验。

例如,我必须异步加载一些资源,只有加载完之后,我才能继续做更多的工作。不用说,没有顺序他们将如何完成。我该怎么做?

javascript 中,我喜欢使用 jQuery.deferred 基础架构,比如说

$.when(deferred1,deferred2,deferred3...)
 .done(
   function()//here everything is done
    ...
   );

但是我在 Java 中做什么呢?

【问题讨论】:

什么版本的Java? 看看RxJava 在谷歌上查找 Java 线程。在 Java 中很容易实现并发。 @FelipeSulser 很容易做错,正如关于 SO 的问题经常证明的那样。 CountdownLatch 或 CyclicBarrier 可以提供帮助 【参考方案1】:

您可以通过多种方式实现它。

1.ExecutorServiceinvokeAll()API

执行给定的任务,当所有任务完成时返回一个包含状态和结果的 Futures 列表。

2.CountDownLatch

一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

CountDownLatch 使用给定的计数进行初始化。由于调用countDown() 方法,await 方法一直阻塞,直到当前计数达到零,之后所有等待的线程都被释放,任何后续的 await 调用立即返回。这是一次性现象——计数无法重置。如果您需要重置计数的版本,请考虑使用 CyclicBarrier。

3.ForkJoinPool 或 newWorkStealingPool() in Executors 是其他方式

查看相关的 SE 问题:

How to wait for a thread that spawns it's own thread?

Executors: How to synchronously wait until all tasks have finished if tasks are created recursively?

【讨论】:

很好的答案。我还要添加 CyclicBarrier——它有一个构造函数,它接受一个 Runnable,它将在所有任务完成后执行:docs.oracle.com/javase/8/docs/api/java/util/concurrent/… 在不阻塞任何线程的情况下,做同样事情的正确方法是什么?【参考方案2】:

我会使用并行流。

Stream.of(runnable1, runnable2, runnable3).parallel().forEach(r -> r.run());
// do something after all these are done.

如果您需要它是异步的,那么您可以使用池或线程。

我必须异步加载一些资源,

你可以像这样收集这些资源。

List<String> urls = ....

Map<String, String> map = urls.parallelStream()
                              .collect(Collectors.toMap(u -> u, u -> download(u)));

这将为您提供所有资源的映射,一旦它们被同时下载。并发将是您默认拥有的 CPU 数量。

【讨论】:

@RobinJonsson forEach 在所有任务完成之前不会返回。不支持异步运行流代码,这就是为什么你需要做其他事情的原因。 你并不总是可以运行,你也不总是可以控制异步。例如,我正在调用一个名为getMapAsync(OnMapReadyCallback). 的外部方法,它立即返回,并启动它自己的线程。我还需要以类似的方式加载其他内容,并在一切完成后执行代码。我认为我找到的最佳解决方案是 CountDownLatch 是正确的解决方案。【参考方案3】:

如果我不使用并行 Streams 或 Spring MVC 的 TaskExecutor,我通常使用CountDownLatch。使用 # of tasks 实例化,为每个完成其任务的线程减少一次。 CountDownLatch.await() 等到latch 为0。真的很有用。

在此处阅读更多信息:JavaDocs

【讨论】:

这和Semaphore有什么区别? @doom777 CountdownLatch 用于启动一系列线程,然后等待它们全部完成(或直到它们调用 countDown() 给定次数)。信号量用于控制正在使用资源的并发线程的数量。该资源可以是文件之类的东西,也可以是通过限制执行线程数的 cpu。随着不同线程调用acquire() 和release(),信号量的计数可以上下浮动。 是的,但CountdownLatch不只是一个上不去的信号量吗? 它的用途不止于此。它基本上可以作为一个简单的执行器池。仅将它用于“一个方向”计数(向上向下)对我来说似乎毫无意义。【参考方案4】:

就个人而言,如果我使用的是 Java 8 或更高版本,我会这样做。

// Retrieving instagram followers
CompletableFuture<Integer> instagramFollowers = CompletableFuture.supplyAsync(() -> 
    // getInstaFollowers(userId);

    return 0; // default value
);

// Retrieving twitter followers
CompletableFuture<Integer> twitterFollowers = CompletableFuture.supplyAsync(() -> 
    // getTwFollowers(userId);

    return 0; // default value
);

System.out.println("Calculating Total Followers...");
CompletableFuture<Integer> totalFollowers = instagramFollowers
    .thenCombine(twitterFollowers, (instaFollowers, twFollowers) -> 
         return instaFollowers + twFollowers; // can be replaced with method reference
);

System.out.println("Total followers: " + totalFollowers.get()); // blocks until both the above tasks are complete

我使用了supplyAsync(),因为我正在从任务中返回一些值(在这种情况下为关注者数量),否则我可以使用runAsync()。这两个都在单独的线程中运行任务。

最后,我使用thenCombine() 加入了CompletableFuture。如果一个依赖于另一个,您也可以使用thenCompose() 加入两个CompletableFuture。但是在这种情况下,由于这两个任务可以并行执行,所以我使用了thenCombine()

getInstaFollowers(userId)getTwFollowers(userId) 方法是简单的 HTTP 调用之类的。

【讨论】:

【参考方案5】:

您可以使用 ThreadPool 和 Executors 来执行此操作。

https://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html

【讨论】:

【参考方案6】:

这是我使用线程的示例。它是一个固定大小为 50 个线程的静态执行器服务。

public class ThreadPoolExecutor 

private static final ExecutorService executorService = Executors.newFixedThreadPool(50,
        new ThreadFactoryBuilder().setNameFormat("thread-%d").build());

private static ThreadPoolExecutor instance = new ThreadPoolExecutor();

public static ThreadPoolExecutor getInstance() 
    return instance;


public <T> Future<? extends T> queueJob(Callable<? extends T> task) 
    return executorService.submit(task);


public void shutdown() 
    executorService.shutdown();


执行器的业务逻辑是这样使用的:(可以使用Callable或Runnable。Callable可以返回一些东西,Runnable不行)

public class MultipleExecutor implements Callable<ReturnType> //your code

以及执行者的调用:

ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutor.getInstance();

List<Future<? extends ReturnType>> results = new LinkedList<>();

for (Type Type : typeList) 
            Future<? extends ReturnType> future = threadPoolExecutor.queueJob(
                    new MultipleExecutor(needed parameters));
            results.add(future);
        

        for (Future<? extends ReturnType> result : results) 
            try 
                if (result.get() != null) 
                    result.get(); // here you get the return of one thread
                
             catch (InterruptedException | ExecutionException e) 
                logger.error(e, e);
            
        

【讨论】:

【参考方案7】:

jQuery 中的$.Deferred 的行为相同,您可以使用名为CompletableFuture 的类在Java 8 中存档。这个类提供了使用Promises 的API。为了创建异步代码,您可以使用其中一种static 创建方法,例如#runAsync#supplyAsync。然后使用#thenApply 对结果进行一些计算。

【讨论】:

【参考方案8】:

我通常选择异步通知开始、通知进度、通知结束方法:

class Task extends Thread 
    private ThreadLauncher parent;

    public Task(ThreadLauncher parent) 
        super();
        this.parent = parent;
    

    public void run() 
        doStuff();

        parent.notifyEnd(this);
    

    public /*abstract*/ void doStuff() 
        // ...
    



class ThreadLauncher 

    public void stuff() 
        for (int i=0; i<10; i++)
            new Task(this).start(); 
    

    public void notifyEnd(Task who) 
        // ...
    

【讨论】:

是的,如果你愿意,你可以实现 Runnable

以上是关于如何在 Java 中等待多个任务完成?的主要内容,如果未能解决你的问题,请参考以下文章

如何等待java线程池中所有任务完成

java-等待唤醒机制(线程中的通信)-线程池

等待多个并发事件完成的模型

面试官:如何让主线程等待所有的子线程结束之后再执行?我懵了

如何等待并行任务完成

Java并发编程基础