线程池定制初探

Posted Secondworld

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池定制初探相关的知识,希望对你有一定的参考价值。

背景

? 我在的公司虽然是移动支付领域的公司。但是我做的业务类似于管理系统,所以一开始写代码的时候没有考虑到数据的量的问题。开始有一个统计页面,大概要统计的数据分为十多个维度,然后每个维度需要考虑十个左右的方面。也就是统计页面轻轻地点击一个查询按钮,要进行100次左右的数据库查询。开始数据量小的时候,查询还能够使用,页面不会超时。到后面数据量越来越大,最大的一张表数据量已经超过1亿。这时候悲催的事情发生了--- 页面点击查询直接未响应.....


方案考虑

? 其实当时的方案我想了两种:

  1. 优化业务实现逻辑,其实在查询的时候一个表的多个维度的查询可以传一个维度列表进去。查出来结果之后,后台在进行分组计算。
  2. 采用多线程,对每一个维度采用一个线程去执行。这样其实每个线程的查询次数在10次左右,总的时间差不多可以缩短10倍。

? 当然,当时我知道最好的方案是混合使用1和2。但当时1的实现存在以下问题,最终让我决定选择方案2:

  1. 因为每个维度涉及多张表, 不同的表归属于不同的模块。如果某张表的查询条件不支持维度列表,那么需要提需求给对应模块开发...... (何年何月能完)
  2. 方案1的改动涉及代码变动较多,且不好封装每个线程任务,写出来的代码逻辑有点绕,后台存在大量的重新分维度统计的代码。简而言之就是不优雅

实现考虑

? 既然最终选定方案2的话,那么自然考虑到选择线程池。那么选啥线程池呢?Single?Schedule肯定不用想直接PASS。Cached?其实当前来说是可行的,因为当前线上的维度也就十多个。以Cached线程池的特性,只要同时并发的线程数量不至于太大,也不至于给系统太大压力导致系统瘫痪。但是因为维度会随着业务的增长而越来越多,如果后续维度增加到20甚至30,那么对系统的压力就无法预估了。

? 思前想后,我最终决定选择Fix线程池,将线程池固化大小为10个。但这时候我又想,其实统计页面一天查询的次数并不多。可能就每天早上点击查询一次,后面可能就不再点查询。那么这时候又出现了两种蛋疼的选择:

  1. 直接在查询的方法内部初始化线程池
  2. 在类的属性中初始化线程池

? 第1种方案的话,每次查询都要重新初始化线程池,造成很大的资源消耗。如果连续查询多次,并不会后面比前面快,反而可能由于不停的线程池销毁创建导致越来越慢。最终我选择了第2种方案,当然我并没有选择饿汉模式直接初始化,而是选择了懒汉模式在方法中进行线程池初始化,且通过锁保证只初始化一次。


想法尝试

? 到这里你如果觉得我的定制初探就完了,那你就too young too naive。我不追求可行,只追求完美~

这时候我就在想,其实根据用户的操作习惯,统计页面的查询按钮,要么就隔着几个小时不按,要么就可能一时心血来潮,连续查询几天或者同一天分多个维度查询多次。而大家都知道Fix线程池固化了线程池的大小,即使后面连续几个小时没有任务来,仍然会一直保持着初始大小的线程数。那么能不能实现即能够控制线程数量Fix,又可以在空闲的时候销毁核心线程呢?答案当然是有的,关键点在于:ThreadPoolExecutor的allowCoreThreadTimeOut方法

/**
     * Sets the policy governing whether core threads may time out and
     * terminate if no tasks arrive within the keep-alive time, being
     * replaced if needed when new tasks arrive. When false, core
     * threads are never terminated due to lack of incoming
     * tasks. When true, the same keep-alive policy applying to
     * non-core threads applies also to core threads. To avoid
     * continual thread replacement, the keep-alive time must be
     * greater than zero when setting <tt>true</tt>. This method
     * should in general be called before the pool is actively used.
     * @param value <tt>true</tt> if should time out, else <tt>false</tt>
     * @throws IllegalArgumentException if value is <tt>true</tt>
     * and the current keep-alive time is not greater than zero.
     *
     * @since 1.6
     */
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");

        allowCoreThreadTimeOut = value;
    }

? 从源码的注释来看,该方法可以支持线程池的keep-alive time的设置同时对核心线程和非核心线程生效。具体为啥,后面我分析线程池源码的时候会讲到,现在我们只需要看看用到该处的源码(在ThreadPoolExecutor的getTask方法中):

/**
     * Gets the next task for a worker thread to run.  The general
     * approach is similar to execute() in that worker threads trying
     * to get a task to run do so on the basis of prevailing state
     * accessed outside of locks.  This may cause them to choose the
     * "wrong" action, such as trying to exit because no tasks
     * appear to be available, or entering a take when the pool is in
     * the process of being shut down.  These potential problems are
     * countered by (1) rechecking pool state (in workerCanExit)
     * before giving up, and (2) interrupting other workers upon
     * shutdown, so they can recheck state. All other user-based state
     * changes (to allowCoreThreadTimeOut etc) are OK even when
     * performed asynchronously wrt getTask.
     *
     * @return the task
     */
    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

? 关键在于workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),该方法表示尝试从等待队列中获取任务,如果超过keepAlive time,则直接返回null。如果返回null的话,work线程就会被终止。

? 好了这些都是后话,在我看了线程池的源码之后才能够清楚地知道为啥这个参数有这个作用。那么在之前,我是怎么测试验证我的想法的呢?其实很简单:

  1. 我先参照线程池的默认的DefaultThreadFactory定义自己的线程工厂,目的是为了获取线程工厂内的ThreadGroup属性,因为ThreadGroup类有一个activeCount方法,该方法可以获取线程组内活跃的线程个数。
class MyThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        MyThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
        
        // 我所增加的方法,为了获取线程组
        public ThreadGroup getThreadGroup() {
            return this.group;
        }
    }
  1. 万事俱备,只欠东风了,我只需要构造两种不同的情况验证我的猜想即可!
MyThreadFactory myThreadFactory = new MyThreadFactory();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS,
                                new LinkedBlockingQueue<Runnable>(), myThreadFactory); 
// executor.allowCoreThreadTimeOut(true);
for (int i = 0; i <= 20; i++) {
  executor.submit(new MyRunnable());
}
System.out.println(myThreadFactory.getThreadGroup().activeCount());     // 6 
Thread.sleep(20000);
System.out.println("After destroy, active thread count:" + myThreadFactory.getThreadGroup().activeCount());             // 6/1
executor.shutdown();

? 运行的结果:

  1. 如果不执行executor.allowCoreThreadTimeOut(true);两个activeCount的结果都是6
  2. 如果执行executor.allowCoreThreadTimeOut(true);第一个activeCount的结果为6,第二个activeCount的结果为1

最终实现

? 好了,终于到最终定制实现了。我的代码实现如下(类为Spring管理的类,最终线程池shutdown在PreDestroy的时候):

    private volatile ThreadPoolExecutor searchExecutors; 
    
    private final Object lock = new Object();
    
    /**
     * 初始化线程池,开始不进行初始化,免得浪费系统资源
     */
    private void initExecutor() {
        if (searchExecutors != null) {
            return;
        }
        
        synchronized (lock) {
            if (searchExecutors == null) {
                 // 设置一个固定大小为10,核心线程如果超过10分钟空闲也可销毁的线程池
                ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
                tempExecutor.allowCoreThreadTimeOut(true);
                this.searchExecutors = tempExecutor;
            }
        }
    }

    @PreDestroy
    public void destroy() {
        if (searchExecutors != null) {
            searchExecutors.shutdown();
        }
    }

这里再说两点

  1. 这个初始化方法采用了double-check-lock的方式,来保证多线程并发获取到的是同一个线程池实例
  2. 注意到在设置属性searchExecutors之前借助了一个tempExecutor。这样也是为了防止ThreadPoolExecutor对象已经被初始化,但是allowCoreThreadTimeOut还未被执行的问题。(对象过早逃逸导致属性与预期不符)。

总结

? 通过这次线程池定制初探,发现其实看起来再没有技术含量的工作,如果细细想下去还是会有很多可以深入研究的东西。而做软件其实也要像做艺术品一样,多考虑不同的实现可能,尽量选择最完美的解决方案。

以上是关于线程池定制初探的主要内容,如果未能解决你的问题,请参考以下文章

Java 线程线程池初探

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

Java 线程池初探

由浅入深了解线程池之源码初探

java线程池的初探

java优雅定制线程池