如何使用 invokeAll() 让所有线程池完成任务?

Posted

技术标签:

【中文标题】如何使用 invokeAll() 让所有线程池完成任务?【英文标题】:How to use invokeAll() to let all thread pool do their task? 【发布时间】:2013-08-14 16:09:47 【问题描述】:
    ExecutorService pool=Executors.newFixedThreadPool(7);
        List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
        List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();

        for(int i=0;i<=diff;i++)

            String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);

            callList.add(new HotelCheapestFare(str));

               
     future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++)

        System.out.println("name is:"+future.get(i).get().getName());
    

现在我想在进入 for 循环之前将所有任务池化到 invokeAll,但是当我运行这个程序时,for 循环会在 invokeAll 之前执行并抛出这个异常:

java.util.concurrent.ExecutionException: java.lang.NullPointerException at 
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at  
java.util.concurrent.FutureTask.get(Unknown Source) at 
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6‌​5)

Caused by: java.lang.NullPointerException at 
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap‌estFare(HotelCheapes‌​tFare.java:166) 
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:1) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow‌​n Source)
at java.lang.Thread.run

【问题讨论】:

InvokeAll 应该等到所有可调用对象都完成。能否请您添加异常和堆栈跟踪? 内部一些线程在完成前进入for循环并抛出异常 请帮帮我,我必须提交我的任务 根据您的堆栈跟踪:com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheapestFare(HotelCheapes‌​tFare.java:166)。应该有错误。 我现在有一个列表,但是当我调试它时,我得到了我的列表 【参考方案1】:

ExecutorService 的工作方式是,当您调用 invokeAll 时,它会等待所有任务完成:

执行给定的任务,返回一个持有它们的 Futures 列表 全部完成后的状态和结果。 Future.isDone() 对每个都是真的 返回列表的元素。 请注意,已完成的任务可能有 正常终止或抛出异常。结果 如果给定集合被修改,则此方法未定义 此操作正在进行中。1(已添加重点)

这意味着您的任务已全部完成,但有些任务可能引发了异常。此异常是 Future 的一部分 - 调用 get 会导致异常被重新抛出并包裹在 ExecutionException 中。

来自你的堆栈跟踪

java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at 
                                ^^^ <-- from get

您可以看到确实如此。您的一项任务因 NPE 而失败。 ExecutorService 捕获了异常,并在您调用 Future.get 时通过抛出 ExecutionException 来告诉您。

现在,如果您想在任务完成时接受任务,您需要一个ExecutorCompletionService。这充当BlockingQueue,允许您在任务完成时轮询任务。

public static void main(String[] args) throws Exception 
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    executorService.submit(new Runnable() 
        @Override
        public void run() 
            for (int i = 0; i < 100; ++i) 
                try 
                    final Future<String> myValue = completionService.take();
                    //do stuff with the Future
                    final String result = myValue.get();
                    System.out.println(result);
                 catch (InterruptedException ex) 
                    return;
                 catch (ExecutionException ex) 
                    System.err.println("TASK FAILED");
                
            
        
    );
    for (int i = 0; i < 100; ++i) 
        completionService.submit(new Callable<String>() 
            @Override
            public String call() throws Exception 
                if (Math.random() > 0.5) 
                    throw new RuntimeException("FAILED");
                
                return "SUCCESS";
            
        );
    
    executorService.shutdown();

在这个例子中,我有一个任务在ExecutorCompletionService 上调用take,它在Futures 可用时获取它们,然后我将任务提交到ExecutorCompletionService

这将允许您在失败时立即获取失败的任务,而不必等待所有任务一起失败或成功。

唯一的复杂之处是很难告诉轮询线程所有任务都已完成,因为现在一切都是异步的。在这种情况下,我使用了将提交 100 个任务的知识,因此它只需要轮询 100 次。更通用的方法是从 submit 方法中收集 Futures,然后循环它们以查看是否一切都已完成。

【讨论】:

我可以使用带有可调用对象列表的 ExecutorCompletionService @SahilKohli 不,这没有任何意义。在ExecutorService 中使用Collection 的原因是,您可以等待所有 完成后再继续。使用 `ExecutorCompletionService`,您可以轮询完成,因此一次或循环提交它们没有区别。 我的任务很耗时,所以我希望它们同时运行,以便它们同时运行,并且在线程完成后得到我未来的结果 在这种情况下,您可以使用ExecutorServiceinvokeAll 方法并按照您当前的操作处理所有Futures。使用ExecutorCompletionListener 的原因是,您会在每项任务完成后立即收到通知。这是两个不同的东西。我不明白你想要什么;您可以要么在每个结果发生时处理它在最后处理所有内容。 在任何一种情况下您都需要处理可能的ExecutionExceptions。 是的,我希望 forloop 仅在 invokeall() 完成后工作,但它在一两个线程完成后才工作,所以请告诉我如何防止我的 for 循环,以便它仅在列表完成。【参考方案2】:

Future.get() 抛出以下异常。

CancellationException - 如果计算被取消

ExecutionException - 如果计算抛出异常

InterruptedException - 如果当前线程在等待时被中断

在调用get() 方法时捕获所有这些异常。

我已经模拟了一些 Callable 任务的除以零异常,但如果您捕获上述三个异常,如示例代码所示,则一个 Callable 中的异常不会影响提交给 ExecutorService 的其他 Callable 任务。

示例代码sn-p:

import java.util.concurrent.*;
import java.util.*;

public class InvokeAllUsage
    public InvokeAllUsage()
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(10);

        List<MyCallable> futureList = new ArrayList<MyCallable>();
        for ( int i=0; i<10; i++)
            MyCallable myCallable = new MyCallable((long)i+1);
            futureList.add(myCallable);
        
        System.out.println("Start");
        try
            List<Future<Long>> futures = service.invokeAll(futureList);  
            for(Future<Long> future : futures)
                try
                    System.out.println("future.isDone = " + future.isDone());
                    System.out.println("future: call ="+future.get());
                
                catch (CancellationException ce) 
                    ce.printStackTrace();
                 catch (ExecutionException ee) 
                    ee.printStackTrace();
                 catch (InterruptedException ie) 
                    Thread.currentThread().interrupt(); // ignore/reset
                
            
        catch(Exception err)
            err.printStackTrace();
        
        System.out.println("Completed");
        service.shutdown();
    
    public static void main(String args[])
        InvokeAllUsage demo = new InvokeAllUsage();
    
    class MyCallable implements Callable<Long>
        Long id = 0L;
        public MyCallable(Long val)
            this.id = val;
        
        public Long call()

            if ( id % 5 == 0)
                id = id / 0;
                       
            return id;
        
    

输出:

creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Completed

【讨论】:

很好的解释+1,那么使用 service.invokeAll(futureList) 还是递归使用 service.submit(new Runnable) 更好?两种方法的执行时间是否有显着差异?还是只是为了不同的目标,即返回值? 当需要等待所有任务的结果时,使用invokeAll。否则你可以使用提交任务。【参考方案3】:

invokeAll 是一种阻塞方法。这意味着——在所有线程完成之前,JVM 不会继续执行下一行。所以我认为你的线程未来结果有问题。

System.out.println("name is:"+future.get(i).get().getName());

从这一行我认为有一些futures没有结果并且可以为null,所以你应该检查你的代码,如果有一些Futures为null,如果有,在这行执行之前获取一个if。

【讨论】:

这是大错特错。查看堆栈跟踪。 Future.get 方法正在抛出 ExectuionExcetion - 这很明显。结果不是nullCallablecall 方法中遇到了NullPointerException。这被Future 捕获,然后被包裹在ExectionException 中。 对不起,我错过了java.util.concurrent.ExecutionException,所以是Future Code得到这个异常。 其他更好的解决方案

以上是关于如何使用 invokeAll() 让所有线程池完成任务?的主要内容,如果未能解决你的问题,请参考以下文章

ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析

ExecutorService 使用 invokeAll 和超时异常后可调用线程上的超时未终止

vc如何创建 线程池

面试官:如何让主线程等待所有的子线程结束之后再执行?我懵了

多线程和线程池

如何等待java线程池中所有任务完成