如何使用 executor.execute 和 future.get() 结束任务(使线程超时)(通过上升中断)而不阻塞主线程

Posted

技术标签:

【中文标题】如何使用 executor.execute 和 future.get() 结束任务(使线程超时)(通过上升中断)而不阻塞主线程【英文标题】:How to end a task(timeout a thread) (by rising interrupt) using executor.execute and future.get() without blocking the main thread 【发布时间】:2021-12-31 09:34:05 【问题描述】:

我尝试在执行器服务中使用 future,它可以在不阻塞主线程的情况下工作。然而,这最终会创建双倍数量的线程,这是不可取的。在下面的代码中,我能够在没有阻塞的情况下使用 future.get(),但我必须创建双倍的线程数

 for (int i : array) 
                executor.execute(() -> 
                    Future f = new FutureTask(() -> 
                        Goring goring = new Goring();
    
                        goring.goring(i);
    
                    , null);
                    Thread thread = new Thread((Runnable) f);
                    thread.start();
    
                    try 
                        f.get(1, TimeUnit.NANOSECONDS);
                     catch (InterruptedException | ExecutionException | TimeoutException|RuntimeException e) 
                        // TODO Auto-generated catch block
                        // System.out.println(e.getMessage());
                        e.printStackTrace();
                        f.cancel(true);
                    
    
                );
            
            
    
        executor.shutdown();
    
        

【问题讨论】:

没有任务会在一纳秒内完成,因此您可以通过根本不执行任何操作来简化所有代码。在更一般的情况下,您可以为每个 goring 调用创建一个 Callable,然后将所有 Callable 一次性提交给 ExecutorService.invokeAll,并设置超时时间。 您好,感谢您的回复,实际上这段代码是虚拟的,我必须在更大的应用程序中实现这个逻辑,如果说我的客户只想并行执行 3 个任务而不是全部 10 个任务,我必须一次分配所有任务跨度> 【参考方案1】:

通常我会推荐使用ExecutorService.invokeAll,它允许提交任意数量的超时任务,并在超时到期时取消任何尚未完成的任务。

但您已在评论中声明您希望能够以较小的并行组提交任务,例如一次并发运行的任务不超过三个。

您可以创建一个并行度有限的线程池:

ExecutorService executor = Executors.newFixedThreadPool(3);

这里的挑战是您希望在任务开始时开始超时,而不是在提交任务时开始。由于提交的任务无法访问自己的 Future,我将使用 SynchronousQueue 等待从提交线程传递的 Future:

ScheduledExecutorService cancelScheduler =
    Executors.newSingleThreadScheduledExecutor();

for (int i : array) 
    SynchronousQueue<Runnable> cancelQueue = new SynchronousQueue<>();

    Future<?> task = executor.submit(() -> 
        Runnable canceler = cancelQueue.take();
        cancelScheduler.schedule(canceler, 5, TimeUnit.SECONDS);

        Goring goring = new Goring();
        goring.goring(i);

        return null;
    );

    cancelQueue.put(() -> task.cancel(true));


executor.shutdown();
cancelScheduler.shutdown();

顾名思义,Executors.newSingleThreadScheduledExecutor() 只使用一个线程进行所有调度,这非常适合您的目的,因为调度任务只调用 Future.cancel,所花费的时间可以忽略不计。

在任务中包含return null; 的原因是从 lambda 返回值使其成为 Callable 而不是 Runnable。 (Runnable 的 run 方法的返回类型为 void,因此返回值的存在告诉编译器这个 lambda 不可能代表 Runnable。)这意味着它的签名中隐含了 throws Exception,所以没有需要捕获cancelQueue.take() 可能抛出的InterruptedException。

【讨论】:

以上是关于如何使用 executor.execute 和 future.get() 结束任务(使线程超时)(通过上升中断)而不阻塞主线程的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程 - Executor框架Executor,

ExecutorService.execute()

使用多线程

Java线程同步

项目中并发

记一次job不跑的问题