如何使用 invokeAll() 让所有线程池完成任务?
Posted
技术标签:
【中文标题】如何使用 invokeAll() 让所有线程池完成任务?【英文标题】:How to use invokeAll() to let all thread pool do their task? 【发布时间】:2013-08-14 16:09:47 【问题描述】: ExecutorService pool=Executors.newFixedThreadPool(7);
List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();
for(int i=0;i<=diff;i++)
String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);
callList.add(new HotelCheapestFare(str));
future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++)
System.out.println("name is:"+future.get(i).get().getName());
现在我想在进入 for 循环之前将所有任务池化到 invokeAll
,但是当我运行这个程序时,for 循环会在 invokeAll
之前执行并抛出这个异常:
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:65)
Caused by: java.lang.NullPointerException at
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheapestFare(HotelCheapestFare.java:166)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run
【问题讨论】:
InvokeAll 应该等到所有可调用对象都完成。能否请您添加异常和堆栈跟踪? 内部一些线程在完成前进入for循环并抛出异常 请帮帮我,我必须提交我的任务 根据您的堆栈跟踪:com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheapestFare(HotelCheapestFare.java:166)
。应该有错误。
我现在有一个列表,但是当我调试它时,我得到了我的列表
【参考方案1】:
ExecutorService
的工作方式是,当您调用 invokeAll
时,它会等待所有任务完成:
执行给定的任务,返回一个持有它们的 Futures 列表 全部完成后的状态和结果。 Future.isDone() 对每个都是真的 返回列表的元素。 请注意,已完成的任务可能有 正常终止或抛出异常。结果 如果给定集合被修改,则此方法未定义 此操作正在进行中。1(已添加重点)
这意味着您的任务已全部完成,但有些任务可能引发了异常。此异常是 Future
的一部分 - 调用 get
会导致异常被重新抛出并包裹在 ExecutionException
中。
来自你的堆栈跟踪
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
^^^ <-- from get
您可以看到确实如此。您的一项任务因 NPE 而失败。 ExecutorService
捕获了异常,并在您调用 Future.get
时通过抛出 ExecutionException
来告诉您。
现在,如果您想在任务完成时接受任务,您需要一个ExecutorCompletionService
。这充当BlockingQueue
,允许您在任务完成时轮询任务。
public static void main(String[] args) throws Exception
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
executorService.submit(new Runnable()
@Override
public void run()
for (int i = 0; i < 100; ++i)
try
final Future<String> myValue = completionService.take();
//do stuff with the Future
final String result = myValue.get();
System.out.println(result);
catch (InterruptedException ex)
return;
catch (ExecutionException ex)
System.err.println("TASK FAILED");
);
for (int i = 0; i < 100; ++i)
completionService.submit(new Callable<String>()
@Override
public String call() throws Exception
if (Math.random() > 0.5)
throw new RuntimeException("FAILED");
return "SUCCESS";
);
executorService.shutdown();
在这个例子中,我有一个任务在ExecutorCompletionService
上调用take
,它在Future
s 可用时获取它们,然后我将任务提交到ExecutorCompletionService
。
这将允许您在失败时立即获取失败的任务,而不必等待所有任务一起失败或成功。
唯一的复杂之处是很难告诉轮询线程所有任务都已完成,因为现在一切都是异步的。在这种情况下,我使用了将提交 100 个任务的知识,因此它只需要轮询 100 次。更通用的方法是从 submit
方法中收集 Future
s,然后循环它们以查看是否一切都已完成。
【讨论】:
我可以使用带有可调用对象列表的 ExecutorCompletionServiceExecutorService
中使用Collection
的原因是,您可以等待所有 完成后再继续。使用 `ExecutorCompletionService`,您可以轮询完成,因此一次或循环提交它们没有区别。
我的任务很耗时,所以我希望它们同时运行,以便它们同时运行,并且在线程完成后得到我未来的结果
在这种情况下,您可以使用ExecutorService
的invokeAll
方法并按照您当前的操作处理所有Future
s。使用ExecutorCompletionListener
的原因是,您会在每项任务完成后立即收到通知。这是两个不同的东西。我不明白你想要什么;您可以要么在每个结果发生时处理它或在最后处理所有内容。 在任何一种情况下您都需要处理可能的ExecutionException
s。
是的,我希望 forloop 仅在 invokeall() 完成后工作,但它在一两个线程完成后才工作,所以请告诉我如何防止我的 for 循环,以便它仅在列表完成。【参考方案2】:
Future.get() 抛出以下异常。
CancellationException
- 如果计算被取消
ExecutionException
- 如果计算抛出异常
InterruptedException
- 如果当前线程在等待时被中断
在调用get()
方法时捕获所有这些异常。
我已经模拟了一些 Callable
任务的除以零异常,但如果您捕获上述三个异常,如示例代码所示,则一个 Callable
中的异常不会影响提交给 ExecutorService
的其他 Callable
任务。
示例代码sn-p:
import java.util.concurrent.*;
import java.util.*;
public class InvokeAllUsage
public InvokeAllUsage()
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(10);
List<MyCallable> futureList = new ArrayList<MyCallable>();
for ( int i=0; i<10; i++)
MyCallable myCallable = new MyCallable((long)i+1);
futureList.add(myCallable);
System.out.println("Start");
try
List<Future<Long>> futures = service.invokeAll(futureList);
for(Future<Long> future : futures)
try
System.out.println("future.isDone = " + future.isDone());
System.out.println("future: call ="+future.get());
catch (CancellationException ce)
ce.printStackTrace();
catch (ExecutionException ee)
ee.printStackTrace();
catch (InterruptedException ie)
Thread.currentThread().interrupt(); // ignore/reset
catch(Exception err)
err.printStackTrace();
System.out.println("Completed");
service.shutdown();
public static void main(String args[])
InvokeAllUsage demo = new InvokeAllUsage();
class MyCallable implements Callable<Long>
Long id = 0L;
public MyCallable(Long val)
this.id = val;
public Long call()
if ( id % 5 == 0)
id = id / 0;
return id;
输出:
creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Completed
【讨论】:
很好的解释+1,那么使用 service.invokeAll(futureList) 还是递归使用 service.submit(new Runnable) 更好?两种方法的执行时间是否有显着差异?还是只是为了不同的目标,即返回值? 当需要等待所有任务的结果时,使用invokeAll。否则你可以使用提交任务。【参考方案3】:invokeAll 是一种阻塞方法。这意味着——在所有线程完成之前,JVM 不会继续执行下一行。所以我认为你的线程未来结果有问题。
System.out.println("name is:"+future.get(i).get().getName());
从这一行我认为有一些futures没有结果并且可以为null,所以你应该检查你的代码,如果有一些Futures为null,如果有,在这行执行之前获取一个if。
【讨论】:
这是大错特错。查看堆栈跟踪。Future.get
方法正在抛出 ExectuionExcetion
- 这很明显。结果不是null
。 Callable
在call
方法中遇到了NullPointerException
。这被Future
捕获,然后被包裹在ExectionException
中。
对不起,我错过了java.util.concurrent.ExecutionException,所以是Future Code得到这个异常。
其他更好的解决方案以上是关于如何使用 invokeAll() 让所有线程池完成任务?的主要内容,如果未能解决你的问题,请参考以下文章
ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析