JAVA自定义阻塞型线程池

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA自定义阻塞型线程池相关的知识,希望对你有一定的参考价值。

Java的线程池ThreadPoolExecutor是很常用的,常见构造如下:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

  • corePoolSize: 线程池维护线程的最少数量
  • maximumPoolSize:线程池维护线程的最大数量
  • keepAliveTime: 线程池维护线程所允许的空闲时间
  • unit: 线程池维护线程所允许的空闲时间的单位
  • workQueue: 线程池所使用的缓冲队列
  • handler: 线程池对拒绝任务的处理策略

  正式使用中一般都会设置一个最大缓冲队列容量,如果线程池满它会对继续添加的任务线程执行指定的拒绝策略,ThreadPoolExcetor 的最后一个参数指定了拒绝策略,JDK提供了四种拒绝策略:
AbortPolicy 策略、CallerRunsPolicy策略、 DiscardOledestPolicy策略、DiscardPolicy策略。

AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。

CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。

DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。

DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。

  可以看到默认提供的四种策略似乎都不太友好,要么放弃要么抛异常,有时候我们需要保证任务添加不会失败,只要被添加的任务能依次顺序执行就好了,不需要这个添加动作立即响应,

即让线程池等待池中的任务完成后再继续添加新任务,此时需要自定义拒绝策略以及任务缓冲队列,代码如下:

package com.montnets.task;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义阻塞型线程池 当池满时会阻塞任务提交
 * 
 * @ClassName: BlockThreadPool
 * @Description: TODO
 * @author: wangs
 * @date: 2018-1-24 下午5:24:54
 */
public class BlockThreadPool {

    private ThreadPoolExecutor pool = null;
    
    public BlockThreadPool(int poolSize) {
        pool = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new CustomThreadFactory(),
                new CustomRejectedExecutionHandler());
    }
    
    public void destory() {
        if (pool != null) {
            pool.shutdownNow();
        }
    }

    private class CustomThreadFactory implements ThreadFactory {
        private AtomicInteger count = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = BlockThreadPool.class.getSimpleName() + count.addAndGet(1);
            t.setName(threadName);
            return t;
        }
    }

    private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // 核心改造点,由blockingqueue的offer改成put阻塞方法
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void execute(Runnable runnable) {
        this.pool.execute(runnable);
    }

    // 测试构造的线程池
    public static void main(String[] args) {
        BlockThreadPool pool = new BlockThreadPool(3);
        for (int i = 1; i < 100; i++) {
            System.out.println("提交第" + i + "个任务!");
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getId() + "=====开始");
                        TimeUnit.SECONDS.sleep(10);
                        System.out.println(Thread.currentThread().getId() + "=====【结束】");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("【提交第" + i + "个任务成功!】");
        }

        // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
        // exec.destory();
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  该类的核心实现还是ThreadPoolExecutor,只是自定义了拒绝策略CustomRejectedExecutionHandler,缓冲队列也修改成了ArrayBlockingQueue,即实现了阻塞功能,

当核心池和缓存队列满了之后外部再调用execute时就会阻塞住,一直等到池里某个任务完成后释放出空闲线程以后,再将该任务添加到缓存队列,而不会抛异常或丢弃该任务。

适用于一些定时扫描触发任务类场景。


-- 多情为何忘情,无心怎去用心,空有知人之智,恨无自知之明。

 





以上是关于JAVA自定义阻塞型线程池的主要内容,如果未能解决你的问题,请参考以下文章

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

JUC并发编程 共享模式之工具 线程池 -- 自定义线程池(阻塞队列)

通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务

通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务

通过自定义阻塞队列触发ThreadPoolExecutor创建非核心线程执行任务

Java线程池监控小结