线程池

Posted Carl_Hugo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池相关的知识,希望对你有一定的参考价值。

JAVA线程池的实现

使用线程池的好处
  1. 降低资源消耗
  2. 提高响应速度
  3. 提高线程的可管理性

Executor框架

作用:线程工厂,可以通过Executors创建特定功能的线程池。

线程池的种类

1. newFixedThreadPool(num)

固定数量的线程池,线程池中只存在核心线程,线程池中的核心线程数等于最大线程数,当有新的任务来临时不会开辟新的线程来处理任务。(LinkedBlockingQueue)。(一开始就创建num个线程)

2. newCachedThreadPool()

缓存线程池,线程池中线程数量不确定,理论上可达Integer.MAX_VALUE,线程池中线程都是非核心线程。线程空闲时间超过keepAliveTime会自动被回收(SynchronousQueue)。

3. newScheduledThreadPool(num)

任务线程池,线程池中核心线程的数量固定,对于非核心线程的数量不限制,非核心线程存在超时回收机制。主要用于执行定时任务和周期性任务的场景。

4. newSingleThreadExecutor()

单一线程池,非核心线程,主要确保任务在同一线程中顺序执行,类似于同步。(LinkedBlockingQueue)。

创建线程池的参数

ThreadPoolExecutor源码

public ThreadPoolExecutor(int corePoolSize,//核心线程数
                              int maximumPoolSize,//最大线程数
                              long keepAliveTime,//线程活动保持时间
                              TimeUnit unit,//keepAliveTime的单位
                              BlockingQueue<Runnable> workQueue,//阻塞队列
                              ThreadFactory threadFactory,//线程工厂
                              RejectedExecutionHandler handler)//拒绝策略
  1. corePoolSize:核心线程数,当提交一个任务到线程池中,线程池会创建一个线程来执行任务,直到线程数达到corePoolSize时不再创建,这个时候把提交的新任务放入阻塞队列,如果调用preStartAllCoreThread(),则会在创建线程池的时候初始化核心线程。
  2. maximunPoolSize:允许创建的最大线程数,如果阻塞队列满,同时创建的线程数小于最大线程数,就会创建新的线程来执行任务。
  3. keepAliveTime:非核心线程空闲时的存活时间,超过时间线程会被终止。
  4. unit:keepAliveTime的时间单位。
  5. workQueue:阻塞队列,有四种阻塞队列类型,ArrayBlockingQueue(基于数组)、LinkedBlockingQueue(基于链表)、SynchronousQueue(不存储元素)、PriorityBlockingQueue(具有优先级的);
  6. threadFactory:用于创建线程的线程工厂。
  7. handler:阻塞队列满且没有空闲线程,执行自定义处理策略。

任务添加到线程池之后的运行流程

调用submit或者execute方法。

调用submit(Runnable runTask)方法,需要传入一个实现Callable接口的对象,在任务执行完后通过Future对象获得任务的返回值。submit(Runnable runTask)内部实际上还是执行execute方法。

调用execute方法是不能获得任务执行结束后的返回值的,且不会抛出异常。

线程池存在的状态

ctl(AtomicInteger):利用低29位表示线程池中的线程数,高3位表示线程池的运行状态。

RUNNING:线程池会接受任务,并处理阻塞队列中的任务(高3位为111)。

SHUTDOWN:线程池不会接受任务,但是会处理阻塞队列中的任务(高3位为000)。

STOP:线程池不会接受新的任务,不会处理阻塞队列中的任务,中断正在执行的任务(高3位位001)。

TIDYING:过渡阶段,所有任务都执行结束,线程池中不存在有效线程,并且将调用terminated方法。(高三位010)

TERMINATED:终止状态(高三位011)

  • shutdown()-线程池状态从Running转换为shutdown
  • shutdownNow()-线程池状态从Running/Shutdown转换为Stop
  • 阻塞队列为空,线程池为空,线程池状态从shutdown转换为Tidying
  • 线程池为空,线程池状态从Stop转换为Tidying
  • Terminated()-线程池状态从Tidying转换为Terminate
Executor方法
  1. 判断当前线程池中线程数量是否小于corePoolSIze,如果小于则通过addWorker方法创建一个新的Worker对象执行当前的任务。

  2. 如果当前线程池中线程数量大于corePoolSize,尝试将当前任务添加到阻塞队列中;二次检查线程池中的状态,如果线程池不在Running状态,则将刚刚添加到阻塞队列中的任务移除,同时拒绝当前任务请求;

    如果二次检查发现当前线程池处于Running状态,那么会检查当前线程池中工作线程数量是否为0,如果为0,就通过addWorker创建一个Worker对象处理阻塞队列中的任务。

  3. 如果原先线程池不处于Running状态或者将当前任务添加到阻塞队列出错时,就通过addWorker创建新的Worker对象处理当前任务,如果添加失败,则拒绝当前任务。

executor方法仅仅检查当前线程池中线程数量有没有超过corePoolSize,maximumPoolSize是通过addWorker方法检测的。

int wc = workerCountOf(c);
if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;

当线程数量超过maximumPoolSize,就会调用return false;

一个线程池中线程的数量实际就是这个线程池中Worker的数量,如果Worker大小超过超过corePoolSize,那么任务都在阻塞队列中。Worker是对Java中任务的一个封装。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
Worker(Runnable firstTask) 
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        

Worker实现Runnable接口,通过构造方法可知,一般传入的对象也实现了Callable或者Runnable接口(在调用submit()会将Callable对象封装成实现了Runnable接口的对象的)。

public <T> Future<T> submit(Callable<T> task) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    

Worker对象创建后会执行内部的run方法,在run方法中会执行runWorker方法,如果传入的任务不为空,则执行任务。为空则退出runWorker方法。

getTask()返回的任务为空的情况
  1. worker数量超过maximumPoolSize
  2. 线程池状态为Stop
  3. 线程池状态为Shutdown且阻塞队列为空
  4. 通过定时从阻塞队列中获取数据,但是超时后仍然未能获取。
线程池运行的4个阶段
  1. poolSize

线程池时如何重用线程的

线程重用是在getTask()中进行实现的。

  1. getTask()中存在两个for循环,它不断从阻塞队列中取出需要执行的任务,返回给runWorker()。
  2. runWorker()方法判断返回对象,只要不为空就执行run()处理它。直到接受到getTask()里传来的空值(阻塞队列中没有任务),这样一个线程处理完一个任务接着处理阻塞队列中的下一个任务。
  3. 线程池中不同线程也是可以并发处理阻塞队列中的任务的。
  4. 当阻塞队列中不存在任务时就会判断是否需要回收Worker对象。

线程池的关闭

shutdown() - 不会影响阻塞队列中的任务执行,但是会拒绝执行新加入的任务同时回收闲置worker。

shutdownNow() - 线程池状态切换为stop,不会处理阻塞中任务,也不会处理新加入的任务,同时回收所有worker。

自定义线程池使用详解

1. 使用有界队列
  1. corePoolSize-线程池实际线程数小于corePoolSize,则优先创建线程;若大于corePoolSize则会将任务加入队列。
  2. maximumPoolSize-如果队列已满,则在总线程数不大于maximumPoolSize的前提下创建新的线程,如果线程数大于maximumPoolSize,则执行拒绝策略。
  3. keepAliveTime-线程没有任务执行时继续存活的时间。

下限:corePoolSize 上限:maximumPoolSize 缓冲:有界队列

ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,//coreSize
                2,//MaxSize
                60,//keepAliveTime
                TimeUnit.SECONDS, //时间单位
                new ArrayBlockingQueue<Runnable>(3),//指定有界队列
                new MyRejected()
                );
2.使用无界队列

无界队列不存在入队失败的情况。(除非系统资源耗尽)

  1. corePoolSize-当有新的线程到来时,系统线程数小于corePoolSize,则新建线程执行任务。当达到corePoolSize后,就不会继续增加,若没有空闲的线程资源,则直接进入队列等待。

下限:corePoolSize 上限:无 缓冲:有界队列

ExecutorService executor = new ThreadPoolExecutor(
                5,//core
                10,//max
                120L,//2min
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(),);

以上是关于线程池的主要内容,如果未能解决你的问题,请参考以下文章

java泛型上限下限,通配符

java泛型上限下限,通配符

如何在R中找到函数的上限和下限

期权价格的上限和下限

SQLite 优化:同时搜索下限和上限

分页上限和下限