浅谈Java线程池

Posted 罗记Tom

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈Java线程池相关的知识,希望对你有一定的参考价值。

1. 什么是线程池?

很简单,简单看名字就知道是装有线程的池子,我们可以把要执行的多线程交给线程池来处理,和数据库连接池的概念一样,通过维护一定数量的线程池来达到多个线程的复用。

2. 线程池的好处

我们知道不用线程池的话,每个线程都要通过new Thread(xxRunnable).start()的方式来创建并运行一个线程,线程少的话这不会是问题,而真实环境可能会开启多个线程让系统和程序达到最佳效率,当线程数达到一定数量就会耗尽系统的CPU和内存资源,也会造成GC频繁收集和停顿,因为每次创建和销毁一个线程都是要消耗系统资源的,如果为每个任务都创建线程这无疑是一个很大的性能瓶颈。所以,线程池中的线程复用极大节省了系统资源,当线程一段时间不再有任务处理时它也会自动销毁,而不会长驻内存。

3. 线程池核心类

在java.util.concurrent包中我们能找到线程池的定义,其中ThreadPoolExecutor是我们线程池核心类,首先看看线程池类的主要参数有哪些。

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*/
public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler) {
       if (corePoolSize < 0 ||
           maximumPoolSize <= 0 ||
           maximumPoolSize < corePoolSize ||
           keepAliveTime < 0)
           throw new IllegalArgumentException();
       if (workQueue == null || threadFactory == null || handler == null)
           throw new NullPointerException();
       this.acc = System.getSecurityManager() == null ?
               null :
               AccessController.getContext();
       this.corePoolSize = corePoolSize;
       this.maximumPoolSize = maximumPoolSize;
       this.workQueue = workQueue;
       this.keepAliveTime = unit.toNanos(keepAliveTime);
       this.threadFactory = threadFactory;
       this.handler = handler;
}
参数名 作用
corePoolSize 队列没满时,线程最大并发数
maximumPoolSizes 队列满后线程能够达到的最大并发数
keepAliveTime 空闲线程过多久被回收的时间限制
unit keepAliveTime 的时间单位
workQueue 阻塞的队列类型
RejectedExecutionHandler 超出 maximumPoolSizes + workQueue 时,任务会交给RejectedExecutionHandler来处理

4. 线程池工作流程

1、如果线程池中的线程小于corePoolSize时就会创建新线程直接执行任务。2、如果线程池中的线程大于corePoolSize时就会暂时把任务存储到工作队列workQueue中等待执行。3、如果工作队列workQueue也满时,当线程数小于最大线程池数maximumPoolSize时就会创建新线程来处理,而线程数大于等于最大线程池数maximumPoolSize时就会执行拒绝策略。

5. 线程池分类

Executors是jdk里面提供的创建线程池的工厂类,它默认提供了4种常用的线程池应用,而不必我们去重复构造。

  • newFixedThreadPool固定线程池,核心线程数和最大线程数固定相等,而空闲存活时间为0毫秒,说明此参数也无意义,工作队列为最大为Integer.MAX_VALUE大小的阻塞队列。当执行任务时,如果线程都很忙,就会丢到工作队列等有空闲线程时再执行,队列满就执行默认的拒绝策略。

 /**
 * Creates a thread pool that reuses a fixed number of threads
 * operating off a shared unbounded queue, using the provided
 * ThreadFactory to create new threads when needed.
 */
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>(),
                                     threadFactory);
}
  • newCachedThreadPool带缓冲线程池,从构造看核心线程数为0,最大线程数为Integer最大值大小,超过0个的空闲线程在60秒后销毁,SynchronousQueue这是一个直接提交的队列,意味着每个新任务都会有线程来执行,如果线程池有可用线程则执行任务,没有的话就创建一个来执行,线程池中的线程数不确定,一般建议执行速度较快较小的线程,不然这个最大线程池边界过大容易造成内存溢出。

 /**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available. These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 */
public static ExecutorService newCachedThreadPool() {
       return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>());
}
  • newSingleThreadExecutor单线程线程池,核心线程数和最大线程数均为1,空闲线程存活0毫秒同样无意思,意味着每次只执行一个线程,多余的先存储到工作队列,一个一个执行,保证了线程的顺序执行。

/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
*/
public static ExecutorService newSingleThreadExecutor() {
   return new FinalizableDelegatedExecutorService
      (new ThreadPoolExecutor(1, 1,
                               0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>()));
}
  • newScheduledThreadPool调度线程池,即按一定的周期执行任务,即定时任务,对ThreadPoolExecutor进行了包装而已。

/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
   return new ScheduledThreadPoolExecutor(corePoolSize);
}

6. 拒绝策略

  • AbortPolicy简单粗暴,直接抛出拒绝异常,这也是默认的拒绝策略。

  • CallerRunsPolicy如果线程池未关闭,则会在调用者线程中直接执行新任务,这会导致主线程提交线程性能变慢。

  • DiscardPolicy从方法看没做任务操作,即表示不处理新任务,即丢弃。

  • DiscardOldestPolicy抛弃最老的任务,就是从队列取出最老的任务然后放入新的任务进行执行。

7. 如何提交线程

可以先随便定义一个固定大小的线程池

ExecutorService es = Executors.newFixedThreadPool(3);

提交一个线程

es.submit(xxRunnble);
es.execute(xxRunnble);

submit和execute分别有什么区别呢?

  1. execute没有返回值,如果不需要知道线程的结果就使用execute方法,性能会好很多。

  2. submit返回一个Future对象,如果想知道线程结果就使用submit提交,而且它能在主线程中通过Future的get方法捕获线程中的异常。

execute()到底方法如何处理

  1. 获取当前线程池的状态。

  2. 当前线程数量小于coreSize时创建一个新的线程运行。

  3. 如果当前线程处于运行状态,并且写入阻塞队列成功。

  4. 双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕,同时执行拒绝策略。

  5. 如果当前线程池为空就新创建一个线程并执行。

  6. 如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略。

8. 如何关闭线程池

  • shutdownNow():立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表

  • shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略

  • isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true


9.手动创建线程池(推荐)

上面说的使用Executors工具类创建的线程池存有隐患,那如何使用才能避免这个隐患呢?对症下药,建立自己的线程工厂类,灵活设置关键参数:

//这里默认拒绝策略为AbortPolicy
private static ExecutorService executor = new ThreadPoolExecutor(10,10,60L, TimeUnit.SECONDS,new ArrayBlockingQueue(10));

使用guava包中的ThreadFactoryBuilder工厂类来构造线程池:

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().build();

private static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), threadFactory, new ThreadPoolExecutor.AbortPolicy());

通过guava的ThreadFactory工厂类还可以指定线程组名称,这对于后期定位错误时也是很有帮助的

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-d%").build();


10.springboot中使用线程池

springboot可以说是非常流行了,下面说说如何在springboot中优雅的使用线程池

@Configuration
public class ThreadPoolConfig {
@Bean(value = "threadPoolInstance")
public ExecutorService createThreadPoolInstance() {
//通过guava类库的ThreadFactoryBuilder来实现线程工厂类并设置线程名称
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%d").build();
ExecutorService threadPool = new ThreadPoolExecutor(10, 16, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), threadFactory, new ThreadPoolExecutor.AbortPolicy());

return threadPool;
}
}


//通过name=threadPoolInstance引用线程池实例
@Resource(name = "threadPoolInstance")
private ExecutorService executorService;

@Override
public void spikeConsumer() {
//TODO
executorService.execute(new Runnable() {
@Override
public void run() {
//TODO
}});
}

11.其他相关

在ThreadPoolExecutor类中有两个比较重要的方法引起了我们的注意:beforeExecute和afterExecute

 protected void beforeExecute(Thread var1, Runnable var2) { }

protected void afterExecute(Runnable var1, Throwable var2) {}

这两个方法是protected修饰的,很显然是留给开发人员去重写方法体实现自己的业务逻辑,非常适合做钩子函数,在任务run方法的前后增加业务逻辑,比如添加日志、统计等。这个和我们springmvc中拦截器的preHandle和afterCompletion方法很类似,都是对方法进行环绕,类似于spring的AOP.


  • Callable和Runnable

Runnable和Callable都可以理解为任务,里面封装这任务的具体逻辑,用于提交给线程池执行,区别在于Runnable任务执行没有返回值,且Runnable任务逻辑中不能通过throws抛出cheched异常(但是可以try catch),而Callable可以获取到任务的执行结果返回值且抛出checked异常。

@FunctionalInterface
public interface Runnable {
public abstract void run();
}

@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}


  • Future和FutureTask

Future接口用来表示执行异步任务的结果存储器,当一个任务的执行时间过长就可以采用这种方式:把任务提交给子线程去处理,主线程不用同步等待,当向线程池提交了一个Callable或Runnable任务时就会返回Future,用Future可以获取任务执行的返回结果。Future的主要方法包括:

  • get()方法:返回任务的执行结果,若任务还未执行完,则会一直阻塞直到完成为止,如果执行过程中发生异常,则抛出异常,但是主线程是感知不到并且不受影响的,除非调用get()方法进行获取结果则会抛出ExecutionException异常;

  • get(long timeout, TimeUnit unit):在指定时间内返回任务的执行结果,超时未返回会抛出TimeoutException,这个时候需要显式的取消任务;

  • cancel(boolean mayInterruptIfRunning):取消任务,boolean类型入参表示如果任务正在运行中是否强制中断;

  • isDone():判断任务是否执行完毕,执行完毕不代表任务一定成功执行,比如任务执行失但也执行完毕、任务被中断了也执行完毕都会返回true,它仅仅表示一种状态说后面任务不会再执行了;

  • isCancelled():判断任务是否被取消;

下面来实际演示Future和FutureTask的用法:

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<Integer> future = executorService.submit(new Task());
Integer integer = future.get();
System.out.println(integer);
executorService.shutdown();
}

static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程开始计算");
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
}


public static void main(String[] args) throws ExecutionException, InterruptedException{
ExecutorService executorService = Executors.newFixedThreadPool(10);
FutureTask<Integer> futureTask = new FutureTask<>(new Task());
executorService.submit(futureTask);
Integer integer = futureTask.get();
System.out.println(integer);
executorService.shutdown();
}

static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程开始计算");
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
}


以上是关于浅谈Java线程池的主要内容,如果未能解决你的问题,请参考以下文章

线程池浅谈

Java——线程池

Java线程池详解

Java线程池详解

Java 线程池详解

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段