Java基础干货如何优雅关闭线程池实践总结

Posted 在路上的德尔菲

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java基础干货如何优雅关闭线程池实践总结相关的知识,希望对你有一定的参考价值。

背景

之前架构中,消费端从Redis队列里拉取任务放到线程池中,如果部署新代码上线,如何保证之前线程池中正在执行的任务和队列里的等待的任务能执行完成而不是直接丢失。

解决方案

方案是在容器销毁时对该线程池Bean做shutdown()操作,下面详细介绍原理。

考虑到使用的Spring中线程池,线程池Bean的类型为org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor,继承了ExecutorConfigurationSupport抽象类,简单来说这个抽象类具有Spring Bean和线程池特征。下图为ThreadPoolTaskExecutor继承实现关系图。
在这里插入图片描述
ExecutorConfigurationSupport 的destroy()方法中 ,如果该线程池waitForTasksToCompleteOnShutdown为true时,执行线程池的shutdown()方法,否则执行线程池shutdownNow()方法。下面为ExecutorConfigurationSupport 的destroy()方法

源码剖析

/**
	 * Calls {@code shutdown} when the BeanFactory destroys
	 * the task executor instance.
	 * @see #shutdown()
	 */
	@Override
	public void destroy() {
		shutdown();
	}

	/**
	 * Perform a shutdown on the underlying ExecutorService.
	 * @see java.util.concurrent.ExecutorService#shutdown()
	 * @see java.util.concurrent.ExecutorService#shutdownNow()
	 * @see #awaitTerminationIfNecessary()
	 */
	public void shutdown() {
    	//waitForTasksToCompleteOnShutdown 默认为false
		if (this.waitForTasksToCompleteOnShutdown) {
			this.executor.shutdown();
		}
		else {
			this.executor.shutdownNow();
		}
		awaitTerminationIfNecessary();
	}

其中 this.executor.shutdown() 逻辑是熟悉的线程池ThreadPoolExecutor

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */    
	public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
	   		// 检查权限
            checkShutdownAccess();
            // 设置线程池状态
	    	advanceRunState(SHUTDOWN);
	   		// 中断空闲线程
            interruptIdleWorkers();
            // 钩子函数,主要用于清理一些资源
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

关于shutdown方法总结有以下几个关键点

  • shutdown 会首先将线程设置成 SHUTDOWN 状态,然后中断所有没有正在运行的线程(空闲线程)。
  • 正在执行的线程和已经在队列中的线程并不会被中断,其实就是要等待所有任务正常全部结束以后才会关闭线程池,符合“优雅”关闭线程池需求
  • 调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。
  • shutdown()返回值为空。

对比shutdownNow()方法

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
	    	// 检查状态
            checkShutdownAccess();
	    	// 将线程池状态变为 STOP
            advanceRunState(STOP);
            // 中断所有线程,包括正在工作线程以及空闲线程
	    	interruptWorkers();
	    	// 丢弃工作队列中存量任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
       

shutdownNow()有以下几点需要明白

  • shutdown 会首先将线程设置成 STOP 状态。
  • 然后中断所有线程即执行interrupt,抛出InterruptedException。
  • shutdownNow()返回值为List类型,里面为全部取消执行的任务。

上面awaitTerminationIfNecessary()具体实现,目的是控制等待的时间,防止任务无限期的运行。

	/**
	 * Wait for the executor to terminate, according to the value of the
	 * {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"} property.
	 */
	private void awaitTerminationIfNecessary() {
    //awaitTerminationSeconds默认为0,等待线程池到Teminate的时间
		if (this.awaitTerminationSeconds > 0) {
			try {
				if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) {
          //省略日志逻辑
				}
			}
			catch (InterruptedException ex) {
        //省略日志逻辑
				Thread.currentThread().interrupt();
			}
		}
	}

实践操作

第一步对ThreadPoolTaskExecutor的WaitForTasksToCompleteOnShutdown和AwaitTerminationSeconds属性进行配置。

threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);

第二步在Bean配置中添加 destroy-method=“destroy”,我们知道Spring Bean的生命周期,使在Spring容器销毁时,该线程池Bean执行destroy()方法。

    <bean id="threadPool"
          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="destroy">
    </bean>

扩展

感兴趣的话可以自己去跑下代码,比较下shutdown()和shutdownNow()的区别,通过debug和跑出来的结果理解得会比较深刻
例子一 shutdown()

    public void shutdownTest() throws InterruptedException {

        ExecutorService service = new ThreadPoolExecutor(10,10,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());

        for (int i = 0; i < 1000; i++) {
            service.submit(() ->{
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    System.out.println("接受中断,不处理~~");
                }
                	System.out.println(Thread.currentThread().getName());
            });
        }
        service.shutdown();
    }

例子二 shutdownNow()

    public void shutdownNowTest() throws InterruptedException {

        ExecutorService service = new ThreadPoolExecutor(10,10,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1000));

        for (int i = 0; i < 1000; i++) {
            service.submit(() ->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    System.out.println("接受中断,结束线程~~");
                    //这里响应中断
                    return;
                }
                System.out.println(Thread.currentThread().getName());
            });
        }

        final List<Runnable> runnables = service.shutdownNow();
        System.out.println(runnables);
    }

总结

今天从一个实际线上问题入手,为了解决优雅关闭线程池问题,分析了线程池内部shutdown和shutdNow方法的区别,最后通过Spring Bean 生命周期中destroy()完成了这个问题。
这次只是引出了线程池关闭的一小块内容,对于线程池整体架构设计,下次可以再分析下线程池内部原理。

以上是关于Java基础干货如何优雅关闭线程池实践总结的主要内容,如果未能解决你的问题,请参考以下文章

如何优雅的关闭Java线程池

如何优雅的关闭线程池?

如果优雅地关闭ExecutorService提供的java线程池

如何优雅地关闭线程池?从源码剖析线程池的正确销毁姿势。

Java 线程池实践出真知

java优雅定制线程池