Java ExecutorService - 任务/可调用不取消/中断

Posted

技术标签:

【中文标题】Java ExecutorService - 任务/可调用不取消/中断【英文标题】:Java ExecutorService - Task/Callable not cancelling/interrupting 【发布时间】:2018-01-05 12:09:38 【问题描述】:

我正在使用 Java ExecutorService (ThreadPool) 来执行任务并在特定活动处于前台(可见)时更新 UI。

问题: 我想要的是当用户切换到另一个活动时我想停止/取消所有任务(无论是排队还是正在运行)。为此,在调用 isDone() 检查 Future 对象状态后,我必须在 ExecutorService 提交方法返回的 Future 对象上使用 ExecutorService 关闭/shutdownNow 方法或 cancel(true)。这会将中断的相应线程标志设置为 TRUE,我必须在我的可调用实现中检查 (Thread.currentThread.isInterrupted()) 以确定是否被中断退出任务/线程。问题是我是否在这两种情况下都调用 ExecutorService 关闭方法或 Future cancel(true) 方法,很少有 10 次将线程中断标志设置为 TRUE,这最终导致内存泄漏等。

代码:

ThreadPool Singleton 实现(cancelAll-取消任务&shutdownExecutor-关闭ExecutorService):

private static class ThreadPoolManager 

    private ExecutorService executorService;
    private List<Future> queuedFutures;
    private BlockingQueue<Runnable> blockingQueue;

    private static ThreadPoolManager instance;

    private ThreadPoolManager() 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
        queuedFutures = new ArrayList<>();
        blockingQueue = new LinkedBlockingDeque<>();
        executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
    

    static 
        instance = new ThreadPoolManager();
    

    public static void submitItemTest(Callable<Object> callable) 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
        if(instance.executorService.isShutdown())
            instance=new ThreadPoolManager();
        
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    

    public static void submitTestAll(Callable<Object> callable) 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
        if(instance.executorService.isShutdown())
            instance=new ThreadPoolManager();
        
        cancelAll();
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    

    public static void cancelAll() 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
        instance.blockingQueue.clear();
        for (Future future : instance.queuedFutures) 
            if (!future.isDone()) 
                boolean cancelled = future.cancel(true);
                MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
            
        
        instance.queuedFutures.clear();
    

    public static void shutdownExecutor()
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
        instance.executorService.shutdownNow();
    

可调用实现(正常迭代和 if 子句检查中断):

private Callable<Object> getTestAllCallable() 
        return new Callable<Object>() 
            @Override
            public Object call() 
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) 
                    if (!Thread.currentThread().isInterrupted()) 
                          //someWork

                     else 
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                        return null;
                    
                
                return null;
            
        ;
    

Activity/Fragment onStop 实现(用于调用取消任务和关闭):

@Override
public void onStop() 
    MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
    ThreadPoolManager.cancelAll();
    ThreadPoolManager.shutdownExecutor();
    super.onStop();

更新:

所做的更改:

    不再使用 Runnable 而不是 callable。

    现在不对 ExecutorService 使用单例。

      private class ThreadPoolManager 
    
        private ExecutorService executorService;
        private List<Future> queuedFutures;
        private BlockingQueue<Runnable> blockingQueue;
    
        private ThreadPoolManager() 
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
            queuedFutures = new ArrayList<>();
            blockingQueue = new LinkedBlockingDeque<>();
            executorService =getNewExecutorService();
        
    
        private ExecutorService getNewExecutorService()
            return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
        
    
        private void submitItemTest(Runnable runnable) 
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
            if(executorService.isShutdown())
                executorService=getNewExecutorService();
            
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        
    
        private void submitTestAll(Runnable runnable) 
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
            if(executorService.isShutdown())
                executorService=getNewExecutorService();
            
            cancelAll();
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        
    
        private void cancelAll() 
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
            blockingQueue.clear();
            for (Future future : queuedFutures) 
                if (!future.isDone()) 
                    boolean cancelled = future.cancel(true);
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
                
            
            queuedFutures.clear();
        
    
        private void shutdownExecutor()
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
            executorService.shutdownNow();
            blockingQueue.clear();
            queuedFutures.clear();
        
    
    

找到了罪魁祸首,但还没有解决办法。以下 2 是 Runnables 1 的实现,其中运行(isInterrupted 返回 true 或出现 InterupptedException 并且任务结束)但不是其他的。

Working Runnable(我用它来测试):

new Runnable() 
          @Override
          public void run() 
                    int i=0;
                    while(!Thread.currentThread().isInterrupted())
                        try 
                            System.out.println(i);
                            Thread.currentThread().sleep(2000);
                         catch (InterruptedException e) 
                            MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
                            return;
                        
                        i++;
                    
                
            

不工作(我想使用的实际代码):

new Runnable()
            @Override
            public void run() 
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) 
                    if (!Thread.currentThread().isInterrupted()) 

                     else 
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
                        break;
                    
                
            
        ;

一种可能的解决方案是使用变量(布尔值)作为可运行文件中的中断标志,我将考虑作为最后的手段,但很高兴知道错误。

【问题讨论】:

【参考方案1】:

根据ExecutorService 文档,关闭正在执行的任务是尽最大努力完成的。

因此,当您调用ExecutorService.shutdownNow() 时,实现将尝试 关闭所有当前正在执行的任务。每个任务都会继续运行,直到它检测到它被中断。

为了确保您的线程在早期阶段达到该点,最好在循环中添加一个检查线程是否被中断,如下所示:

Thread.currentThread().isInterrupted();

通过在每次迭代中进行此调用,您的线程将在距实际中断很短的时间间隔内检测到中断。

因此您修改后的Callable 代码如下所示:

private Callable<Object> getTestAllCallable() 
    return new Callable<Object>() 
        @Override
        public Object call() 
            for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) 
                if(Thread.currentThread().isInterrupted()) 
                    return null;
                
                if(someCondition) 
                    //someWork
                 else 
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                    return null;
                
            
            return null;
        
    ;

顺便说一句,如果您不打算从call() 方法返回任何值,那么使用Callable 是没有意义的。如果您在任务中需要参数化类型,只需创建一个参数化 Runnable,如下所示:

public class ParameterizedRunnable<T> implements Runnable 
    private final T t;

    public ParameterizedRunnable(T t) 
        this.t = t;
    

    public void run() 
        //do some work here
    

【讨论】:

抱歉,我现在编辑了我的代码,以包含您所说的内容,实际上我已经按照问题描述中的说明这样做了,但与其他不相关的代码一起被清除了。 我在将可调用对象提交给 ExecutorService 时使用可调用对象而不是可运行对象来获取 Future 对象,现在知道同样可以使用可运行对象来完成。感谢您告诉我,我将更改代码以使用 runnable。 它工作 10 次。我也不敢相信,因为我已经进行了很多搜索并研究了到处要求相同内容检查 isInterrupted 的文档,并相应地做一些工作或离开。【参考方案2】:

解决方案(出路): 所以最后我继续使用自定义内部标志(布尔值)作为线程中断标志,MyRunnable 将在每次迭代时检查它(自定义实现可运行的自定义标志,以便有一个与每个可运行相关联的标志)。当需要取消 ExecutorService(ThreadPool) 下的线程时,我会遍历所有 Future 对象并将其关联 MyRunnable,然后将其中断标志(自定义标志)设置为 true,而不是中断/关闭线程。

线程池管理器:

private class ThreadPoolManager 

        private ExecutorService executorService;
        private final Map<Future,MyRunnable> queuedFutures;
        private final BlockingQueue<Runnable> blockingQueue;

        private ThreadPoolManager() 
            MyLogger.log(DEBUG, "Threadpool-created(constructor)");
            queuedFutures = new HashMap<>();
            blockingQueue = new LinkedBlockingDeque<>();
            executorService = getNewExecutorService();
        

        private ExecutorService getNewExecutorService() 
            return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
        

        private void submitItemTest(MyRunnable runnable) 
            MyLogger.log(DEBUG, "Threadpool-submitted item test");
            if (executorService.isShutdown()) 
                executorService = getNewExecutorService();
            
            Future future = executorService.submit(runnable);
            queuedFutures.put(future,runnable);
        

        private void submitTestAll(MyRunnable runnable) 
            MyLogger.log(DEBUG, "Threadpool-submitted test all");
            if (executorService.isShutdown()) 
                executorService = getNewExecutorService();
            
            cancelAll();
            Future future = executorService.submit(runnable);
            queuedFutures.put(future,runnable);
        

        private void cancelAll() 
            MyLogger.log(DEBUG, "ThreadPool: Cancelling all future tasks");
            blockingQueue.clear();
            for (Future future : queuedFutures.keySet()) 
                if (!future.isDone()) 
                    queuedFutures.get(future).continueRunning=false;
                    MyLogger.log(DEBUG, "Cancelled");
                
            
            queuedFutures.clear();
        

        private void shutdownExecutor() 
            cancelAll();
            MyLogger.log(DEBUG, "ThreadPool: Shuttingdown threadpool");
            executorService.shutdown();
        
    

MyRunnable(实现 Runable 的抽象类):

private abstract class MyRunnable implements Runnable 
        boolean continueRunning=true;
    

MyRunnable(抽象类 MyRunnable 的实例):

new MyRunnable() 
       @Override
       public void run() 
           for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) 
                 if (continueRunning) 
                        //someWork
                  else 
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadPool: Pool Thread Interrupted (closing down)");
                     break;
                 
            
            System.out.println("ThreadPool: Test complete");
         
     ;

现在,调用 threadPoolManager.shutdownExecutor() 会关闭/中断当前运行的所有线程。

【讨论】:

以上是关于Java ExecutorService - 任务/可调用不取消/中断的主要内容,如果未能解决你的问题,请参考以下文章

java中ExecutorService的线程池如何暂停所有的任务和继续所有的任务? 有这样的函数吗?

Java多线程系列七——ExecutorService

Java ExecutorService - 任务/可调用不取消/中断

Java并发包中的几种ExecutorService

Java:在特定队列大小后阻止提交的 ExecutorService [重复]

聊聊高并发(三十九)解析java.util.concurrent各个组件(十五) 理解ExecutorService接口的设计