如何在 Java 中等待多个任务完成?
Posted
技术标签:
【中文标题】如何在 Java 中等待多个任务完成?【英文标题】:How to wait for completion of multiple tasks in Java? 【发布时间】:2016-04-15 11:00:24 【问题描述】:在 Java 应用程序中实现并发的正确方法是什么?我知道线程之类的东西,当然,我已经为 Java 编程 10 年了,但在并发方面没有太多经验。
例如,我必须异步加载一些资源,只有加载完之后,我才能继续做更多的工作。不用说,没有顺序他们将如何完成。我该怎么做?
在 javascript 中,我喜欢使用 jQuery.deferred
基础架构,比如说
$.when(deferred1,deferred2,deferred3...)
.done(
function()//here everything is done
...
);
但是我在 Java 中做什么呢?
【问题讨论】:
什么版本的Java? 看看RxJava 在谷歌上查找 Java 线程。在 Java 中很容易实现并发。 @FelipeSulser 很容易做错,正如关于 SO 的问题经常证明的那样。 CountdownLatch 或 CyclicBarrier 可以提供帮助 【参考方案1】:您可以通过多种方式实现它。
1.ExecutorServiceinvokeAll()
API
执行给定的任务,当所有任务完成时返回一个包含状态和结果的 Futures 列表。
2.CountDownLatch
一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
CountDownLatch
使用给定的计数进行初始化。由于调用countDown()
方法,await 方法一直阻塞,直到当前计数达到零,之后所有等待的线程都被释放,任何后续的 await 调用立即返回。这是一次性现象——计数无法重置。如果您需要重置计数的版本,请考虑使用 CyclicBarrier。
3.ForkJoinPool 或 newWorkStealingPool()
in Executors 是其他方式
查看相关的 SE 问题:
How to wait for a thread that spawns it's own thread?
Executors: How to synchronously wait until all tasks have finished if tasks are created recursively?
【讨论】:
很好的答案。我还要添加 CyclicBarrier——它有一个构造函数,它接受一个 Runnable,它将在所有任务完成后执行:docs.oracle.com/javase/8/docs/api/java/util/concurrent/… 在不阻塞任何线程的情况下,做同样事情的正确方法是什么?【参考方案2】:我会使用并行流。
Stream.of(runnable1, runnable2, runnable3).parallel().forEach(r -> r.run());
// do something after all these are done.
如果您需要它是异步的,那么您可以使用池或线程。
我必须异步加载一些资源,
你可以像这样收集这些资源。
List<String> urls = ....
Map<String, String> map = urls.parallelStream()
.collect(Collectors.toMap(u -> u, u -> download(u)));
这将为您提供所有资源的映射,一旦它们被同时下载。并发将是您默认拥有的 CPU 数量。
【讨论】:
@RobinJonsson forEach 在所有任务完成之前不会返回。不支持异步运行流代码,这就是为什么你需要做其他事情的原因。 你并不总是可以运行,你也不总是可以控制异步。例如,我正在调用一个名为getMapAsync(OnMapReadyCallback).
的外部方法,它立即返回,并启动它自己的线程。我还需要以类似的方式加载其他内容,并在一切完成后执行代码。我认为我找到的最佳解决方案是 CountDownLatch
是正确的解决方案。【参考方案3】:
如果我不使用并行 Streams 或 Spring MVC 的 TaskExecutor,我通常使用CountDownLatch
。使用 # of tasks 实例化,为每个完成其任务的线程减少一次。 CountDownLatch.await()
等到latch 为0。真的很有用。
在此处阅读更多信息:JavaDocs
【讨论】:
这和Semaphore有什么区别? @doom777 CountdownLatch 用于启动一系列线程,然后等待它们全部完成(或直到它们调用 countDown() 给定次数)。信号量用于控制正在使用资源的并发线程的数量。该资源可以是文件之类的东西,也可以是通过限制执行线程数的 cpu。随着不同线程调用acquire() 和release(),信号量的计数可以上下浮动。 是的,但CountdownLatch
不只是一个上不去的信号量吗?
它的用途不止于此。它基本上可以作为一个简单的执行器池。仅将它用于“一个方向”计数(向上或向下)对我来说似乎毫无意义。【参考方案4】:
就个人而言,如果我使用的是 Java 8 或更高版本,我会这样做。
// Retrieving instagram followers
CompletableFuture<Integer> instagramFollowers = CompletableFuture.supplyAsync(() ->
// getInstaFollowers(userId);
return 0; // default value
);
// Retrieving twitter followers
CompletableFuture<Integer> twitterFollowers = CompletableFuture.supplyAsync(() ->
// getTwFollowers(userId);
return 0; // default value
);
System.out.println("Calculating Total Followers...");
CompletableFuture<Integer> totalFollowers = instagramFollowers
.thenCombine(twitterFollowers, (instaFollowers, twFollowers) ->
return instaFollowers + twFollowers; // can be replaced with method reference
);
System.out.println("Total followers: " + totalFollowers.get()); // blocks until both the above tasks are complete
我使用了supplyAsync()
,因为我正在从任务中返回一些值(在这种情况下为关注者数量),否则我可以使用runAsync()
。这两个都在单独的线程中运行任务。
最后,我使用thenCombine()
加入了CompletableFuture
。如果一个依赖于另一个,您也可以使用thenCompose()
加入两个CompletableFuture
。但是在这种情况下,由于这两个任务可以并行执行,所以我使用了thenCombine()
。
getInstaFollowers(userId)
和 getTwFollowers(userId)
方法是简单的 HTTP 调用之类的。
【讨论】:
【参考方案5】:您可以使用 ThreadPool 和 Executors 来执行此操作。
https://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
【讨论】:
【参考方案6】:这是我使用线程的示例。它是一个固定大小为 50 个线程的静态执行器服务。
public class ThreadPoolExecutor
private static final ExecutorService executorService = Executors.newFixedThreadPool(50,
new ThreadFactoryBuilder().setNameFormat("thread-%d").build());
private static ThreadPoolExecutor instance = new ThreadPoolExecutor();
public static ThreadPoolExecutor getInstance()
return instance;
public <T> Future<? extends T> queueJob(Callable<? extends T> task)
return executorService.submit(task);
public void shutdown()
executorService.shutdown();
执行器的业务逻辑是这样使用的:(可以使用Callable或Runnable。Callable可以返回一些东西,Runnable不行)
public class MultipleExecutor implements Callable<ReturnType> //your code
以及执行者的调用:
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutor.getInstance();
List<Future<? extends ReturnType>> results = new LinkedList<>();
for (Type Type : typeList)
Future<? extends ReturnType> future = threadPoolExecutor.queueJob(
new MultipleExecutor(needed parameters));
results.add(future);
for (Future<? extends ReturnType> result : results)
try
if (result.get() != null)
result.get(); // here you get the return of one thread
catch (InterruptedException | ExecutionException e)
logger.error(e, e);
【讨论】:
【参考方案7】:与jQuery
中的$.Deferred
的行为相同,您可以使用名为CompletableFuture
的类在Java 8
中存档。这个类提供了使用Promises
的API。为了创建异步代码,您可以使用其中一种static
创建方法,例如#runAsync
、#supplyAsync
。然后使用#thenApply
对结果进行一些计算。
【讨论】:
【参考方案8】:我通常选择异步通知开始、通知进度、通知结束方法:
class Task extends Thread
private ThreadLauncher parent;
public Task(ThreadLauncher parent)
super();
this.parent = parent;
public void run()
doStuff();
parent.notifyEnd(this);
public /*abstract*/ void doStuff()
// ...
class ThreadLauncher
public void stuff()
for (int i=0; i<10; i++)
new Task(this).start();
public void notifyEnd(Task who)
// ...
【讨论】:
是的,如果你愿意,你可以实现 Runnable以上是关于如何在 Java 中等待多个任务完成?的主要内容,如果未能解决你的问题,请参考以下文章