java线程池ThreadPoolExecutor

Posted king西阳

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java线程池ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。

使用线程池的好处:

  • 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处-理效率
  • 线程并发数量过多,抢占系统资源从而导致阻塞
  • 对线程进行一些简单的管理

1、ThreadPoolExecutor的一个构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

参数说明:

corePoolSize:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize:线程池中允许的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果使用的是无界的队列,那么这个参数没有意义了。
keepAliveTime:线程池工作线程空闲后,保持的存活时间。
unit :上面时间的单位
workQueue:执行任务在执行之前加入的queue。该队列仅保存由execute方法提交的Runnable任务。
threadFactory:创建新线程的工厂方法。可以通过线程工厂给每个创建的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字
handler:当队列满时,线程处于饱和策略。

2、工作流程

如果使用无界队列很简单,开启核心线程数,多余的全部阻塞直到内存耗尽,故不建议使用。
如果使用有界队列,工作流程如下:
  * 若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
  * 若大于corePoolSize,则会将任务加入队列,
  * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
  * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。

3、handler饱和策略

官方给出的饱和策略有如下几种:

  AbortPolicy:直接抛异常。
  DiscardPolicy:顾名思义,直接丢弃多余的任务。
  DiscardOldestPolicy:丢弃队列中最老的任务,并且重试exexute方法,并发书中说丢弃最新的一个任务,这边书中应该描述错了。
  CallerRunsPolicy:在调用线程中执行丢弃的任务。

也可以自定义策略来处理,例如:

public class MyRejected implements RejectedExecutionHandler {
    public MyRejected(){
    }

    //拒绝策略实际调用的方法处理丢弃的任务,可以做一些日志处理
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("自定义饱和策略,当前被拒绝任务为:" + r.toString());
    }
}

//使用
...
 ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,              //coreSize
                200,              //MaxSize
                60,             //60
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3)         //指定一种队列 (有界队列)
                , new MyRejected()
                , new DiscardOldestPolicy()
        );
...

4、队列

ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照FIFO(先进先出)原则对元素进行排序
LinkedBlockingQueue:是一个基于链表结构的有界阻塞队列,此队列按照FIFO排序元素,吞吐量高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool(n)使用了此队列
PriorityBlockingQueue:一个具有优先级的无限阻塞队列
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了此队列

5、示例

任务线程:

public class MyTask implements Runnable {

    private int taskId;
    private String taskName;

    public MyTask(int taskId, String taskName){
        this.taskId = taskId;
        this.taskName = taskName;
    }

    public int getTaskId() {
        return taskId;
    }

    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    @Override
    public void run() {
        try {
            System.out.println("run taskId =" + this.taskId);
            Thread.sleep(5*1000);
            //System.out.println("end taskId =" + this.taskId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }       
    }

    public String toString(){
        return Integer.toString(this.taskId);
    }

}

线程池执行器:

public class UseThreadPoolExecutor1 {


    public static void main(String[] args) {
        /**
         * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
         * 若大于corePoolSize,则会将任务加入队列,
         * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
         * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
         * 
         */ 
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,              //coreSize
                2,              //MaxSize
                60,             //60
                TimeUnit.SECONDS, 
                new ArrayBlockingQueue<Runnable>(3)         //指定一种队列 (有界队列)
                //new LinkedBlockingQueue<Runnable>()
                //, new MyRejected()
                , new DiscardOldestPolicy()
                );

        MyTask mt1 = new MyTask(1, "任务1");
        MyTask mt2 = new MyTask(2, "任务2");
        MyTask mt3 = new MyTask(3, "任务3");
        MyTask mt4 = new MyTask(4, "任务4");
        MyTask mt5 = new MyTask(5, "任务5");
        MyTask mt6 = new MyTask(6, "任务6");

     //ThreadPoolExecutor.execute(Runnable command)方法即可向线程池内添加一个任务。 pool.execute(mt1); pool.execute(mt2); pool.execute(mt3); pool.execute(mt4); pool.execute(mt5); pool.execute(mt6);
     //关闭线程池 pool.shutdown(); } }

执行结果:

run taskId =5
run taskId =1
run taskId =3
run taskId =4
run taskId =6

分析:
使用有界阻塞队列,先为1开启一个核心线程,然后为2 3 4入队,然后
为5开启一个线程(最大线程),本来应该是舍弃6,但是指定了舍弃策略
是舍弃最老的,也就是阻塞队列队头元素2,所以舍弃2而执行6

6、常见四种线程池

1.可缓存线程池CachedThreadPool()
源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
}    

这种线程池内部没有核心线程,线程的数量是有没限制的。在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。没有工作的线程(闲置状态)在超过了60S还不做事,就会销毁。

 

创建方法:

ExecutorService mCachedThreadPool = Executors.newCachedThreadPool();

使用:

private void startDownload(final ProgressBar progressBar, final int i) {
        mCachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                int p = 0;
          //每隔10s progressBar.setMax(
10); while (p < 10) { p++; progressBar.setProgress(p); Bundle bundle = new Bundle(); Message message = new Message(); bundle.putInt("p", p); bundle.putString("ThreadName", Thread.currentThread().getName()); message.what = i; message.setData(bundle); mHandler.sendMessage(message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); }

2.定长线程池FixedThreadPool 
源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
}

该线程池的最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。

如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。

创建方法:

//nThreads => 最大线程数即maximumPoolSize
ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads);
//threadFactory => 创建线程的方法,用得少
ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);

使用:

private void startDownload(final ProgressBar progressBar, final int i) {
    mFixedThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            //....逻辑代码自己控制
        }
     });
}

3.单线程线程池SingleThreadPool

源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>()));
}    

有且仅有一个工作线程执行任务,所有任务按照指定顺序执行,即遵循队列的入队出队规则


创建方法:

ExecutorService mSingleThreadPool = Executors.newSingleThreadPool();

使用同FixedThreadPool 

4.定时任务线程池ScheduledThreadPool
源码:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
        DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
        new DelayedWorkQueue());
}

DEFAULT_KEEPALIVE_MILLIS就是默认10L,这里就是10秒。这个线程池有点像是吧CachedThreadPool和FixedThreadPool 结合了一下。

不仅设置了核心线程数,最大线程数也是Integer.MAX_VALUE。
这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池。

创建方法:

//nThreads => 最大线程数即maximumPoolSize
ExecutorService mScheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

使用:

//表示在3秒之后开始执行我们的任务。
mScheduledThreadPool.schedule(new Runnable() {   @Override public void run() { //.... } }, 3, TimeUnit.SECONDS);
//延迟3秒后执行任务,从开始执行任务这个时候开始计时,每5秒执行一次不管执行任务需要多长的时间。 

mScheduledThreadPool.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //....
    }
},3, 5, TimeUnit.SECONDS);

7、其他

  * 执行线程次提交的任务通过execute方法和submit方法(submit提供了重载方法)。execute不返回值,而submit返回值,并且可以通过返回的Future对象的get方法查看返回值。

  * 使用无界BlockingQueue:与有界队列相比,除非系统资源耗尽,否则无界的队列不存在任务入队失败的情况。当有新任务到来,系统的线程小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。若后续仍有新的队列加入,而没有空闲的线程资源,则队列直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。所以一般不建议使用无界队列,另外使用无界队列的时候,最大线程数,拒绝策略等都失去了意义

  *阿里编译器建议线程池创建时最好能手动进行创建,给出理由:

    newFixedThreadPoolExecutor和newSingleThreadPoolExecutor:主要问题是堆积请求的处理队列可能会消耗非常大的内存,甚至OOM     
    newCachedThreadPoolExecutor和newScheduledThreadPoolExecutor:主要问题是线程数最大是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM

 

以上是关于java线程池ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章

java线程池之ThreadPoolExecutor

Java多线程系列——线程池原理之 ThreadPoolExecutor

Java - "JUC线程池" ThreadPoolExecutor原理解析

Java常用四大线程池用法以及ThreadPoolExecutor详解

Java常用四大线程池用法以及ThreadPoolExecutor详解

java线程池之ThreadPoolExecutor