Java 执行器:当任务完成时,如何在不阻塞的情况下得到通知?
Posted
技术标签:
【中文标题】Java 执行器:当任务完成时,如何在不阻塞的情况下得到通知?【英文标题】:Java executors: how to be notified, without blocking, when a task completes? 【发布时间】:2010-10-23 23:54:49 【问题描述】:假设我有一个队列,里面装满了我需要提交给执行器服务的任务。我希望他们一次处理一个。我能想到的最简单的方法是:
-
从队列中获取任务
提交给执行人
在返回的 Future 上调用 .get 并阻塞直到结果可用
从队列中获取另一个任务...
但是,我试图完全避免阻塞。如果我有 10,000 个这样的队列,它们需要一次处理一个任务,我将用完堆栈空间,因为它们中的大多数将保留阻塞的线程。
我想要的是提交一个任务并提供一个在任务完成时调用的回调。我将使用该回调通知作为发送下一个任务的标志。 (functionaljava 和 jetlang 显然使用了这样的非阻塞算法,但是我看不懂他们的代码)
如何使用 JDK 的 java.util.concurrent 来做到这一点,而不是编写自己的执行器服务?
(为我提供这些任务的队列本身可能会阻塞,但这是稍后要解决的问题)
【问题讨论】:
【参考方案1】:使用CountDownLatch
。
它来自java.util.concurrent
,这正是等待多个线程完成执行后再继续的方式。
为了实现您所关注的回调效果,这确实需要一些额外的工作。也就是说,您自己在一个单独的线程中处理这个问题,该线程使用CountDownLatch
并等待它,然后继续通知您需要通知的任何内容。没有对回调或任何类似效果的原生支持。
编辑:既然我进一步理解了你的问题,我认为你已经过分了,没有必要。如果您使用常规的SingleThreadExecutor
,则将所有任务交给它,它会在本地进行排队。
【讨论】:
使用 SingleThreadExecutor 知道所有线程都已完成的最佳方法是什么?我看到了一个使用了一段时间 !executor.isTerminated 的示例,但这似乎不是很优雅。我为每个工作人员实现了一个回调功能并增加了一个有效的计数。【参考方案2】:定义一个回调接口来接收你想在完成通知中传递的任何参数。然后在任务结束时调用它。
您甚至可以为 Runnable 任务编写一个通用包装器,并将它们提交给ExecutorService
。或者,请参阅下面的 Java 8 内置机制。
class CallbackTask implements Runnable
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback)
this.task = task;
this.callback = callback;
public void run()
task.run();
callback.complete();
通过CompletableFuture
,Java 8 包含了一种更精细的方法来组合管道,其中流程可以异步和有条件地完成。这是一个人为但完整的通知示例。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking
public static void main(String... argv) throws Exception
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
void notify(String msg)
System.out.println("Received message: " + msg);
class ExampleService
String work()
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
public static void sleep(long average, TimeUnit unit)
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try
unit.sleep(timeout);
System.out.println(name + " awoke.");
catch (InterruptedException abort)
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
public static long exponential(long avg)
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
【讨论】:
一眨眼三个答案!我喜欢 CallbackTask,这是一个简单直接的解决方案。回想起来很明显。谢谢。关于其他关于 SingleThreadedExecutor 的 cmets:我可能有数千个队列,可能有数千个任务。他们每个人都需要一次处理一个任务,但不同的队列可以并行操作。这就是我使用单个全局线程池的原因。我是执行人的新手,所以如果我弄错了,请告诉我。 好的模式,不过我会使用Guava's listenable future API,它提供了一个非常好的实现。 @erickson 你能指定它是哪个Callback
导入的吗?那会有很大帮助。太多了,好难找。
@Zelphir 这是您声明的Callback
接口;不是来自图书馆。现在我可能只使用Runnable
、Consumer
或BiConsumer
,这取决于我需要从任务传回给侦听器的内容。
@Bhargav 这是典型的回调——外部实体“回调”到控制实体。您是否希望创建任务的线程阻塞直到任务完成?那么在第二个线程上运行任务的目的是什么?如果您允许线程继续,它将需要重复检查某些共享状态(可能在循环中,但取决于您的程序),直到它注意到由 true 进行的更新(布尔标志、队列中的新项目等)回调,如本答案中所述。然后它可以执行一些额外的工作。【参考方案3】:
如果您想确保不会同时运行任何任务,请使用SingleThreadedExecutor。任务将按照提交的顺序进行处理。您甚至不需要保留任务,只需将它们提交给 exec。
【讨论】:
【参考方案4】:ThreadPoolExecutor
也有 beforeExecute
和 afterExecute
钩子方法,您可以覆盖和使用它们。这是来自ThreadPoolExecutor
的Javadocs的描述。
挂钩方法
此类提供受保护的可重写
beforeExecute(java.lang.Thread, java.lang.Runnable)
和afterExecute(java.lang.Runnable, java.lang.Throwable)
方法,这些方法在执行每个任务之前和之后调用。这些可用于操纵执行环境;例如,重新初始化ThreadLocals
、收集统计信息或添加日志条目。此外,方法terminated()
可以被覆盖以执行任何需要在Executor
完全终止后完成的特殊处理。如果钩子或回调方法抛出异常,内部工作线程可能会依次失败并突然终止。
【讨论】:
【参考方案5】:您可以扩展FutureTask
类,并覆盖done()
方法,然后将FutureTask
对象添加到ExecutorService
,这样done()
方法将在FutureTask
立即完成时调用。
【讨论】:
then add the FutureTask object to the ExecutorService
,你能告诉我怎么做吗?
@GaryGauh see this for more info 你可以扩展 FutureTask,我们可以称之为 MyFutureTask。然后使用 ExcutorService 提交 MyFutureTask,然后 MyFutureTask 的 run 方法就会运行,当 MyFutureTask 完成时会调用你的 done 方法。这里比较混乱的是两个 FutureTask,实际上 MyFutureTask 是一个普通的 Runnable。【参考方案6】:
使用Guava's listenable future API 并添加回调。参照。来自网站:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>()
public Explosion call()
return pushBigRedButton();
);
Futures.addCallback(explosion, new FutureCallback<Explosion>()
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion)
walkAwayFrom(explosion);
public void onFailure(Throwable thrown)
battleArchNemesis(); // escaped the explosion!
);
【讨论】:
【参考方案7】:在 Java 8 中,您可以使用 CompletableFuture。这是我的代码中的一个示例,我使用它从我的用户服务中获取用户,将它们映射到我的视图对象,然后更新我的视图或显示错误对话框(这是一个 GUI 应用程序):
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> showErrorDialogFor(throwable); return null;
);
它异步执行。我使用了两种私有方法:mapUsersToUserViews
和 updateView
。
【讨论】:
如何将 CompletableFuture 与 executor 一起使用? (限制并发/并行实例的数量)这是一个提示:cf:submitting-futuretasks-to-an-executor-why-does-it-work ?【参考方案8】:只是添加到马特的回答,这有帮助,这里有一个更充实的例子来展示回调的使用。
private static Primes primes = new Primes();
public static void main(String[] args) throws InterruptedException
getPrimeAsync((p) ->
System.out.println("onPrimeListener; p=" + p));
System.out.println("Adios mi amigito");
public interface OnPrimeListener
void onPrime(int prime);
public static void getPrimeAsync(OnPrimeListener listener)
CompletableFuture.supplyAsync(primes::getNextPrime)
.thenApply((prime) ->
System.out.println("getPrimeAsync(); prime=" + prime);
if (listener != null)
listener.onPrime(prime);
return prime;
);
输出是:
getPrimeAsync(); prime=241
onPrimeListener; p=241
Adios mi amigito
【讨论】:
【参考方案9】:您可以使用 Callable 的实现,这样
public class MyAsyncCallable<V> implements Callable<V>
CallbackInterface ci;
public MyAsyncCallable(CallbackInterface ci)
this.ci = ci;
public V call() throws Exception
System.out.println("Call of MyCallable invoked");
System.out.println("Result = " + this.ci.doSomething(10, 20));
return (V) "Good job";
CallbackInterface 是非常基本的东西,比如
public interface CallbackInterface
public int doSomething(int a, int b);
现在主类看起来像这样
ExecutorService ex = Executors.newFixedThreadPool(2);
MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
【讨论】:
【参考方案10】:这是使用 Guava 的 ListenableFuture
对 Pache 答案的扩展。
特别是,Futures.transform()
返回ListenableFuture
,因此可用于链接异步调用。 Futures.addCallback()
返回 void
,因此不能用于链接,但适用于处理异步完成时的成功/失败。
// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());
// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
Futures.transform(database, database -> database.query(table, ...));
// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
Futures.transform(cursor, cursor -> cursorToFooList(cursor));
// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>()
public void onSuccess(List<Foo> foos)
doSomethingWith(foos);
public void onFailure(Throwable thrown)
log.error(thrown);
);
注意:除了链接异步任务外,Futures.transform()
还允许您将每个任务安排在单独的执行程序上(此示例中未显示)。
【讨论】:
这看起来不错。【参考方案11】:使用ExecutorService
实现Callback
机制的简单代码
import java.util.concurrent.*;
import java.util.*;
public class CallBackDemo
public CallBackDemo()
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(5);
try
for ( int i=0; i<5; i++)
Callback callback = new Callback(i+1);
MyCallable myCallable = new MyCallable((long)i+1,callback);
Future<Long> future = service.submit(myCallable);
//System.out.println("future status:"+future.get()+":"+future.isDone());
catch(Exception err)
err.printStackTrace();
service.shutdown();
public static void main(String args[])
CallBackDemo demo = new CallBackDemo();
class MyCallable implements Callable<Long>
Long id = 0L;
Callback callback;
public MyCallable(Long val,Callback obj)
this.id = val;
this.callback = obj;
public Long call()
//Add your business logic
System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
callback.callbackMethod();
return id;
class Callback
private int i;
public Callback(int i)
this.i = i;
public void callbackMethod()
System.out.println("Call back:"+i);
// Add your business logic
输出:
creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4
重点说明:
-
如果您希望按 FIFO 顺序依次处理任务,请将
newFixedThreadPool(5)
替换为 newFixedThreadPool(1)
如果你想在分析上一个任务的callback
的结果后处理下一个任务,只需取消下面的注释
//System.out.println("future status:"+future.get()+":"+future.isDone());
您可以将newFixedThreadPool()
替换为
Executors.newCachedThreadPool()
Executors.newWorkStealingPool()
ThreadPoolExecutor
取决于您的用例。
如果你想异步处理回调方法
一个。将共享的ExecutorService or ThreadPoolExecutor
传递给 Callable 任务
b.将您的 Callable
方法转换为 Callable/Runnable
任务
c。推送回调任务到ExecutorService or ThreadPoolExecutor
【讨论】:
以上是关于Java 执行器:当任务完成时,如何在不阻塞的情况下得到通知?的主要内容,如果未能解决你的问题,请参考以下文章
如何在不阻塞的情况下使用 mpd.idle() 从 GTK 轮询 MPD
Java面试小短文当任务数超过线程池的核心线程数,如何让它不进入阻塞队列直接启用最大数量的线程去执行任务?