ThreadPoolExecutor

Posted zheaven

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。

类的结构:

Executor
-ExecutorService
--AbstractExecutorService
---ThreadPoolExecutor

ThreadPoolExecutor七大构造参数:

package com.dwz.executors;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 *     ThreadPoolExecutor七大构造参数
 *    测试情况:
 *    1.corePoolSize=1,maximumPoolSize=2,BlockingQueue size=1.What happen when submit 3 task?
 *    2.corePoolSize=1,maximumPoolSize=2,BlockingQueue size=5.What happen when submit 7 task?
 *    3.corePoolSize=1,maximumPoolSize=2,BlockingQueue size=5.What happen when submit 8 task?
 */
public class ThreadPoolExecutorBuild {
    /*ThreadPoolExecutor参数介绍
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      RejectedExecutionHandler handler*/
    private static ExecutorService buildThreadPoolExecutor() {
        ExecutorService executorService = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), r -> {
                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.AbortPolicy()) ;
        System.out.println("====The ThreadPoolExecutor create done.");
        
        executorService.execute(() -> sleepSeconds(100));
        executorService.execute(() -> sleepSeconds(10));
        executorService.execute(() -> sleepSeconds(10));
        return executorService;
    }
    
    private static void sleepSeconds(long seconds) {
        try {
            System.out.println("* " + Thread.currentThread().getName() + " *");
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) buildThreadPoolExecutor();
        
        int activeCount = -1;
        int queueSize = -1;
        
        while(true) {
            if(activeCount != threadPoolExecutor.getActiveCount()
                    || queueSize != threadPoolExecutor.getQueue().size()) {
                System.out.println(threadPoolExecutor.getActiveCount());
                System.out.println(threadPoolExecutor.getCorePoolSize());
                System.out.println(threadPoolExecutor.getQueue().size());
                System.out.println(threadPoolExecutor.getMaximumPoolSize());
                activeCount = threadPoolExecutor.getActiveCount();
                queueSize = threadPoolExecutor.getQueue().size();
                System.out.println("==================================");
            }
        }
    }
}

ThreadPoolExecutor的关闭

1.使用shutdown()

package com.dwz.executors;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class ThreadPoolExecutorTask {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10), r -> {
                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.AbortPolicy()) ;
        
        IntStream.range(0, 20).boxed().forEach(i -> 
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println(Thread.currentThread().getName() + "[ " + i + " ] finish done.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })
        );
        
        /*
         * shutdown()
         * 比如总共有20个线程,10个已经开始工作,10个处于空闲状态(idle),执行完shutdown()后
         * 10个已经开始工作的会等待执行完退出-->interrupt 10个idle-->20个线程全部退出
         */
        executorService.shutdown();
        System.out.println("======================over=======================");
    }
}

这样执行结果不能保证over最后打印,想实现over最后输出需配合awaitTermination方法使用

优化后的代码

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.HOURS);
        System.out.println("======================over=======================");

2.使用shutdownNow()

package com.dwz.executors;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class ThreadPoolExecutorTask {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10), r -> {
                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.AbortPolicy()) ;
        
        IntStream.range(0, 20).boxed().forEach(i -> 
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println(Thread.currentThread().getName() + "[ " + i + " ] finish done.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })
        );
        
        /*
         * shutdownNow()
         * 共有20个线程,10个已经开始工作,10个在队列中,执行完shutdownNow()后
         * 返回10个等待执行的线程,然后立即打断所有的线程,立即退出
         */
        List<Runnable> runnableList = null;
        try {
            runnableList = executorService.shutdownNow();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("======================over=======================");
        System.out.println(runnableList);
        System.out.println(runnableList.size());
    }
}
即使使用了shutdown()和shutdownNow(),有时候线程也不能完全停止,可以配合setDaemon(true)来使用
package com.dwz.executors;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class ThreadPoolExecutorLongTimeTask {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10), r -> {
                    Thread t = new Thread(r);
                    //设置为守护线程,随着主线程的结束而结束
                    t.setDaemon(true);
                    return t;
                }, new ThreadPoolExecutor.AbortPolicy()) ;
        
        IntStream.range(0, 10).boxed().forEach(i -> 
            executorService.submit(() -> {
                while(true) {
                    TimeUnit.SECONDS.sleep(1);
                }
            })
        );
        
        executorService.shutdown();
//        executorService.shutdownNow();
    }
}

 

以上是关于ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章

聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源代码分析

ThreadPoolExecutor().map 与 ThreadPoolExecutor().submit 有何不同?

Java中的线程池——ThreadPoolExecutor源代码分析

JDK1.7中的ThreadPoolExecutor源代码剖析

Java 1.7 ThreadPoolExecutor源代码解析

高并发多线程基础之ThreadPoolExecutor源代码分析