ThreadPoolExecutor 线程池

Posted xieyanke

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor 线程池相关的知识,希望对你有一定的参考价值。

用途:

用于自动化管理线程, 开发人员只需要关注业务实现, 无需关注线程的管理, 降低开发要求

方法解释:

Executor
//执行任务(若执行线程有任务 则进入任务队列等待工作线程拉取) 无返回值
void execute(Runnable command);

ExecutorService

//关闭线程池 不再接收新的任务 但是等待队列中的任务仍会执行完
void shutdown();

//关闭线程池 不再接收新的任务 并且等待队列中的任务不会被执行
List<Runnable> shutdownNow();

//返回线程池状态是否关闭
boolean isShutdown();

//返回线程是是否结束运行 只有在执行shutdown和shutdownNow 之后,并且已经执行结束才会返回true
boolean isTerminated();

//等待执行完结
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

//提交任务 类似execute() 区别是有返回结果对象 
<T> Future<T> submit(Callable<T> task);

//提交任务 若任务执行完成 返回当前给定的result
<T> Future<T> submit(Runnable task, T result);

//提交任务 基本和execute()方法一样, 虽然有返回Future<?>, 但实际是Future<Void>,无返回值
Future<?> submit(Runnable task);

//批量提交任务 并且所有任务依旧会进入任务等候队列 在此批所有任务执行完成之后返回所有结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

//批量提交任务 并且所有任务依旧会进入任务等候队列 但是设有时间限制 如果超过给定时间仍有未执行完成的任务 则直接返回所有结果(此时可能会有些Future没有结果)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;

//批量提交任务 并且所有任务依旧会进入任务等候队列 只要有一个执行完成则立即返回执行完成的那个任务结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;

//批量提交任务 并且所有任务依旧会进入任务等候队列 只要有一个执行完成则立即返回执行完成的那个任务结果 若无任何返回则抛出超时异常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

 

使用举例:

execute(Runnable command)

public static void main(String[] args) throws Exception {
    ExecutorService executorService = new ThreadPoolExecutor(1, 10,0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
    for (int i=1; i<5; i++){
        int finalI = i;
        executorService.submit(()->{
            System.out.println("打印"+finalI);
        });
    }
}
结果:
打印1
打印2
打印3
打印4

submit(Callable<T> task)

//提交任务 类似execute() 区别是有返回结果对象 public static void main(String[] args) throws Exception {
        ExecutorService executorService = new ThreadPoolExecutor(1, 10,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    List<Future<Integer>> futureList = Lists.newArrayList();
    for (int i=1; i<5; i++){
        int finalI = i;
        Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(200);
                    return finalI;
                }
            });
        futureList.add(future);
    }
    //等待所有提交的任务都执行完
    Thread.sleep(5000);
    for (Future<Integer> future : futureList){
        System.out.println("返回:"+future.get());
    }
}
结果:
返回:1
返回:2
返回:3
返回:4

shutdown()

public static void main(String[] args) throws Exception {
    ExecutorService executorService = new ThreadPoolExecutor(1, 10,0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
    for (int i=1; i<5; i++){
        int finalI = i;
        executorService.submit(()->{
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("打印"+finalI);
        });
    }
    executorService.shutdown();
    System.out.println("线程池发起shutdown请求");
    //等待线程池能执行完
    Thread.sleep(5000);
    System.out.println(executorService.isShutdown());
}
结果:
线程池发起shutdown请求
打印1
打印2
打印3
打印4
true

shutdownNow()

public static void main(String[] args) throws Exception {
    ExecutorService executorService = new ThreadPoolExecutor(1, 10,0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
    for (int i=1; i<5; i++){
        int finalI = i;
        executorService.submit(()->{
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("打印"+finalI);
        });
    }
    executorService.shutdownNow();
    System.out.println("线程池发起shutdownNow请求");
    //等待线程池能执行完
    Thread.sleep(5000);
    System.out.println(executorService.isShutdown());
}
结果:
线程池发起shutdownNow请求
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at com.moredian.trade.biz.SocketServerTest.lambda$main$0(SocketServerTest.java:138)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
打印1
true

类图:

技术图片

数据结构图:

投递任务可以是Runnable Callable 下图举Runnable的一个例子,从投递,到进入队列,再到被worker线程消费

技术图片

 

 

工作流程简述:

1 提交任务, exucude、submit、 invokeAll、 invokeAny

2 获取当前工作线程数与核心线程数进行对比, 若核心线程数大于当前工作线程数则添加新的工作线程, 直到核心线程数等于工作线程数

3 工作线程处理完任务后向任务队列获取新的任务执行, 若任务队列没有任务则阻塞当前工作线程(这也是为什么不需要重复创建线程的原因,是否永久阻塞主要看keepAlive),直至新的任务进任务队列

4 随着任务提交, 当等待队列已满时,若满足 条件1: 工作线程都在执行中, 条件2: 最大线程数(maximumPoolSize)大于核心线程 此时开辟新的工作线程, 直至工作线程等于最大线程数

5 当所有工作线程都参与任务处理,任务队列仍满队列时,新的任务将抛弃不执行

6 随着任务的执行, 已经没有新的任务进来, 工作线程中将只保留核心(阻塞等待), 非核心线程将被释放

 

关键源码分析:

submit(Runnable task)

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
//执行任务
public void execute(Runnable command) {
    //若执行任务为空 抛出异常
    if (command == null)
        throw new NullPointerException();
    //获取当前ctl的值 ctl中是高3位作为状态值,低28位作为线程总数值
    //有对应的位运算来获取 一值多用
    int c = ctl.get();
    //若工作中线程数小于核心线程数 则增加一个工作线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //当前线程池是否处于运行状态 且 将任务投递到任务队列是否成功(如果任务队列限定最大长度 则offer返回fase)
    if (isRunning(c) && workQueue.offer(command)) {
        //再次获取ctl的值
        int recheck = ctl.get();
        //若非线程池非运行中(可能是shutdown了), 将当前任务从等待队列中移除 刚放进去就移除
        if (! isRunning(recheck) && remove(command))
            //拒绝此任务
            reject(command);
        //若工作线程数为0 增加一个工作线程 
        //这段代码看起来比较奇怪, 发生在核心线程也能被回收的场景(创建线程池时可以设置)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //增加工作线程失败 拒绝任务
    else if (!addWorker(command, false))
        reject(command);
}
//添加一个工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        //获取当前ctl值
        int c = ctl.get();
        //获取线程池运行状态
        int rs = runStateOf(c);
        //下面这句代码非常难理解对照 !(1==1 && 2==2 && 3==3) == (1!=1 || 2!=2 || 3!=3)
        //将其改写 if(rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
        //rs> SHUTDOWN表示线程池非运行状态
        //总的来说就是如果非运行中 且 (状态不为shutdown 或 当前任务不等于空 或 工作队列为空)
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //当前工作线程数
            int wc = workerCountOf(c);
            //如果工作线程数大于容量 或者 
            //判断是否为核心线程 1若是核心线程不能大于核心线程总数 2若是非核心线程则不能大于最大线程数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //工作线程数+1 并且跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //若此时运行状态发生改变 则继续循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //以下代码为 compareAndIncrementWorkerCount(c) = true的前提下 并且ctl值已经发生改变 运行线程数已经加1
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //新增一个工作线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //全局锁
            mainLock.lock();
            try {
                //当前运行状态
                int rs = runStateOf(ctl.get());
                //工作线程集合workers增加一个工作线程
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //工作线程添加成功则启动当前工作线程 
            if (workerAdded) {
                //开始从任务队列取任务执行
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

维持线程池中工作线程不释放原理

从新建worker开始 new Worker(firstTask),  下面代码重点关注标红字体, 原理是利用阻塞队列调用 poll和take 方法时, 若取不到值则会阻塞当前前程,直至阻塞队列重新添加数据(实现原理是基于AQS 可以看我之前 ReentrentLock Condition文章)

//线程保持原因
//new Worker(firstTask); 
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    //重点关注newThread(this)
    this.thread = getThreadFactory().newThread(this);
}
//newThread(this) 那么直接找ThreadPoolExecutor的run()方法
public void run() {
    //往下看
    runWorker(this);
}
//运行worker 其他的不看 只看getTask()
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

//从任务队列中获取任务
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //关键点在这里 workQueue的方法poll和take都是阻塞方法 若取不到值则当前工作线程阻塞
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

总结:

线程池能保持线程持续运行的关键在于阻塞队列的使用, 主要做的是何时阻塞队列 何时把线程从阻塞中释放出来,  亮点是ctl一值多用  

以上是关于ThreadPoolExecutor 线程池的主要内容,如果未能解决你的问题,请参考以下文章

线程池(ThreadPoolExecutor)源码解析

线程池(ThreadPoolExecutor)源码解析

线程池(ThreadPoolExecutor)源码解析

高并发多线程基础之ThreadPoolExecutor源代码分析

如何获取线程池ThreadPoolExecutor正在运行的线程

线程池ThreadPoolExecutor分析