如何等待java线程池中所有任务完成

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何等待java线程池中所有任务完成相关的知识,希望对你有一定的参考价值。

你可以使用这个CountDownLatch

public class CountDownLatchDemo   
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
    public static void main(String[] args) throws InterruptedException   
        CountDownLatch latch=new CountDownLatch(2);//两个工人的协作  
        Worker worker1=new Worker("zhang san", 5000, latch);  
        Worker worker2=new Worker("li si", 8000, latch);  
        worker1.start();//  
        worker2.start();//  
        latch.await();//等待所有工人完成工作  
        System.out.println("all work done at "+sdf.format(new Date()));  
      
      
      
    static class Worker extends Thread  
        String workerName;   
        int workTime;  
        CountDownLatch latch;  
        public Worker(String workerName ,int workTime ,CountDownLatch latch)  
             this.workerName=workerName;  
             this.workTime=workTime;  
             this.latch=latch;  
          
        public void run()  
            System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));  
            doWork();//工作了  
            System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));  
            latch.countDown();//工人完成工作,计数器减一  
  
          
          
        private void doWork()  
            try   
                Thread.sleep(workTime);  
             catch (InterruptedException e)   
                e.printStackTrace();  
              
          
      
      
       

参考技术A 用java.util.concurrent下面的类实现线程池就可以

Java基础干货如何优雅关闭线程池实践总结

背景

之前架构中,消费端从Redis队列里拉取任务放到线程池中,如果部署新代码上线,如何保证之前线程池中正在执行的任务和队列里的等待的任务能执行完成而不是直接丢失。

解决方案

方案是在容器销毁时对该线程池Bean做shutdown()操作,下面详细介绍原理。

考虑到使用的Spring中线程池,线程池Bean的类型为org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor,继承了ExecutorConfigurationSupport抽象类,简单来说这个抽象类具有Spring Bean和线程池特征。下图为ThreadPoolTaskExecutor继承实现关系图。
在这里插入图片描述
ExecutorConfigurationSupport 的destroy()方法中 ,如果该线程池waitForTasksToCompleteOnShutdown为true时,执行线程池的shutdown()方法,否则执行线程池shutdownNow()方法。下面为ExecutorConfigurationSupport 的destroy()方法

源码剖析

/**
	 * Calls {@code shutdown} when the BeanFactory destroys
	 * the task executor instance.
	 * @see #shutdown()
	 */
	@Override
	public void destroy() {
		shutdown();
	}

	/**
	 * Perform a shutdown on the underlying ExecutorService.
	 * @see java.util.concurrent.ExecutorService#shutdown()
	 * @see java.util.concurrent.ExecutorService#shutdownNow()
	 * @see #awaitTerminationIfNecessary()
	 */
	public void shutdown() {
    	//waitForTasksToCompleteOnShutdown 默认为false
		if (this.waitForTasksToCompleteOnShutdown) {
			this.executor.shutdown();
		}
		else {
			this.executor.shutdownNow();
		}
		awaitTerminationIfNecessary();
	}

其中 this.executor.shutdown() 逻辑是熟悉的线程池ThreadPoolExecutor

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */    
	public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
	   		// 检查权限
            checkShutdownAccess();
            // 设置线程池状态
	    	advanceRunState(SHUTDOWN);
	   		// 中断空闲线程
            interruptIdleWorkers();
            // 钩子函数,主要用于清理一些资源
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

关于shutdown方法总结有以下几个关键点

  • shutdown 会首先将线程设置成 SHUTDOWN 状态,然后中断所有没有正在运行的线程(空闲线程)。
  • 正在执行的线程和已经在队列中的线程并不会被中断,其实就是要等待所有任务正常全部结束以后才会关闭线程池,符合“优雅”关闭线程池需求
  • 调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。
  • shutdown()返回值为空。

对比shutdownNow()方法

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
	    	// 检查状态
            checkShutdownAccess();
	    	// 将线程池状态变为 STOP
            advanceRunState(STOP);
            // 中断所有线程,包括正在工作线程以及空闲线程
	    	interruptWorkers();
	    	// 丢弃工作队列中存量任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
       

shutdownNow()有以下几点需要明白

  • shutdown 会首先将线程设置成 STOP 状态。
  • 然后中断所有线程即执行interrupt,抛出InterruptedException。
  • shutdownNow()返回值为List类型,里面为全部取消执行的任务。

上面awaitTerminationIfNecessary()具体实现,目的是控制等待的时间,防止任务无限期的运行。

	/**
	 * Wait for the executor to terminate, according to the value of the
	 * {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"} property.
	 */
	private void awaitTerminationIfNecessary() {
    //awaitTerminationSeconds默认为0,等待线程池到Teminate的时间
		if (this.awaitTerminationSeconds > 0) {
			try {
				if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {
          //省略日志逻辑
				}
			}
			catch (InterruptedException ex) {
        //省略日志逻辑
				Thread.currentThread().interrupt();
			}
		}
	}

实践操作

第一步对ThreadPoolTaskExecutor的WaitForTasksToCompleteOnShutdown和AwaitTerminationSeconds属性进行配置。

threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);

第二步在Bean配置中添加 destroy-method=“destroy”,我们知道Spring Bean的生命周期,使在Spring容器销毁时,该线程池Bean执行destroy()方法。

    <bean id="threadPool"
          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="destroy">
    </bean>

扩展

感兴趣的话可以自己去跑下代码,比较下shutdown()和shutdownNow()的区别,通过debug和跑出来的结果理解得会比较深刻
例子一 shutdown()

    public void shutdownTest() throws InterruptedException {

        ExecutorService service = new ThreadPoolExecutor(10,10,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());

        for (int i = 0; i < 1000; i++) {
            service.submit(() ->{
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    System.out.println("接受中断,不处理~~");
                }
                	System.out.println(Thread.currentThread().getName());
            });
        }
        service.shutdown();
    }

例子二 shutdownNow()

    public void shutdownNowTest() throws InterruptedException {

        ExecutorService service = new ThreadPoolExecutor(10,10,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1000));

        for (int i = 0; i < 1000; i++) {
            service.submit(() ->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    System.out.println("接受中断,结束线程~~");
                    //这里响应中断
                    return;
                }
                System.out.println(Thread.currentThread().getName());
            });
        }

        final List<Runnable> runnables = service.shutdownNow();
        System.out.println(runnables);
    }

总结

今天从一个实际线上问题入手,为了解决优雅关闭线程池问题,分析了线程池内部shutdown和shutdNow方法的区别,最后通过Spring Bean 生命周期中destroy()完成了这个问题。
这次只是引出了线程池关闭的一小块内容,对于线程池整体架构设计,下次可以再分析下线程池内部原理。

以上是关于如何等待java线程池中所有任务完成的主要内容,如果未能解决你的问题,请参考以下文章

Java如何判断线程池所有任务是不是执行完毕

Java如何判断线程池所有任务是不是执行完毕

Java如何判断线程池所有任务是不是执行完毕

c# 怎么等待线程池中所有线程都运行结束在运行主线程

Java 线程池

Java基础干货如何优雅关闭线程池实践总结