线程池的问题

Posted chengjunde

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池的问题相关的知识,希望对你有一定的参考价值。

面试-线程池的成长之路      

 

 

背景

相信大家在面试过程中遇到面试官问线程的很多,线程过后就是线程池了。从易到难,都是这么个过程,还有就是确实很多人在工作中接触线程池比较少,最多的也就是创建一个然后往里面提交线程,对于一些经验很丰富的面试官来说,一下就可以问出很多线程池相关的问题,与其被问的晕头转向,还不如好好学习。此时不努力更待何时。

什么是线程池?

线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。

如果每个请求都创建一个线程去处理,那么服务器的资源很快就会被耗尽,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

如果用生活中的列子来说明,我们可以把线程池当做一个客服团队,如果同时有1000个人打电话进行咨询,按照正常的逻辑那就是需要1000个客服接听电话,服务客户。现实往往需要考虑到很多层面的东西,比如:资源够不够,招这么多人需要费用比较多。正常的做法就是招100个人成立一个客服中心,当有电话进来后分配没有接听的客服进行服务,如果超出了100个人同时咨询的话,提示客户等待,稍后处理,等有客服空出来就可以继续服务下一个客户,这样才能达到一个资源的合理利用,实现效益的最大化。

Java中的线程池种类

1. newSingleThreadExecutor

创建方式:

  1. ExecutorService pool = Executors.newSingleThreadExecutor();

一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

使用方式:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class ThreadPool {
  4. public static void main(String[] args) {
  5. ExecutorService pool = Executors.newSingleThreadExecutor();
  6. for (int i = 0; i < 10; i++) {
  7. pool.execute(() -> {
  8. System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
  9. });
  10. }
  11. }
  12. }

输出结果如下:

  1. pool-1-thread-1 开始发车啦....
  2. pool-1-thread-1 开始发车啦....
  3. pool-1-thread-1 开始发车啦....
  4. pool-1-thread-1 开始发车啦....
  5. pool-1-thread-1 开始发车啦....
  6. pool-1-thread-1 开始发车啦....
  7. pool-1-thread-1 开始发车啦....
  8. pool-1-thread-1 开始发车啦....
  9. pool-1-thread-1 开始发车啦....
  10. pool-1-thread-1 开始发车啦....

从输出的结果我们可以看出,一直只有一个线程在运行。

2.newFixedThreadPool

创建方式:

  1. ExecutorService pool = Executors.newFixedThreadPool(10);

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

使用方式:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class ThreadPool {
  4. public static void main(String[] args) {
  5. ExecutorService pool = Executors.newFixedThreadPool(10);
  6. for (int i = 0; i < 10; i++) {
  7. pool.execute(() -> {
  8. System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
  9. });
  10. }
  11. }
  12. }

输出结果如下:

  1. pool-1-thread-1 开始发车啦....
  2. pool-1-thread-4 开始发车啦....
  3. pool-1-thread-3 开始发车啦....
  4. pool-1-thread-2 开始发车啦....
  5. pool-1-thread-6 开始发车啦....
  6. pool-1-thread-7 开始发车啦....
  7. pool-1-thread-5 开始发车啦....
  8. pool-1-thread-8 开始发车啦....
  9. pool-1-thread-9 开始发车啦....
  10. pool-1-thread-10 开始发车啦....

3. newCachedThreadPool

创建方式:

  1. ExecutorService pool = Executors.newCachedThreadPool();

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添加新线程来处理任务。

使用方式如上2所示。

4.newScheduledThreadPool

创建方式:

  1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);

此线程池支持定时以及周期性执行任务的需求。

使用方式:

  1. import java.util.concurrent.Executors;
  2. import java.util.concurrent.ScheduledExecutorService;
  3. import java.util.concurrent.TimeUnit;
  4. public class ThreadPool {
  5. public static void main(String[] args) {
  6. ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
  7. for (int i = 0; i < 10; i++) {
  8. pool.schedule(() -> {
  9. System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
  10. }, 10, TimeUnit.SECONDS);
  11. }
  12. }
  13. }

上面演示的是延迟10秒执行任务,如果想要执行周期性的任务可以用下面的方式,每秒执行一次

  1. //pool.scheduleWithFixedDelay也可以
  2. pool.scheduleAtFixedRate(() -> {
  3. System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
  4. }, 1, 1, TimeUnit.SECONDS);

5.newWorkStealingPool
newWorkStealingPool是jdk1.8才有的,会根据所需的并行层次来动态创建和关闭线程,通过使用多个队列减少竞争,底层用的ForkJoinPool来实现的。ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

说说线程池的拒绝策略

当请求任务不断的过来,而系统此时又处理不过来的时候,我们需要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。

  • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2. public AbortPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. throw new RejectedExecutionException("Task " + r.toString() +
  5. " rejected from " +
  6. e.toString());
  7. }
  8. }
  • CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。
  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. public CallerRunsPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. r.run();
  6. }
  7. }
  8. }
  • DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. public DiscardOldestPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. e.getQueue().poll();
  6. e.execute(r);
  7. }
  8. }
  9. }
  • DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。
  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. public DiscardPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. }
  5. }

除了JDK默认为什么提供的四种拒绝策略,我们可以根据自己的业务需求去自定义拒绝策略,自定义的方式很简单,直接实现RejectedExecutionHandler接口即可

比如Spring integration中就有一个自定义的拒绝策略CallerBlocksPolicy,将任务插入到队列中,直到队列中有空闲并插入成功的时候,否则将根据最大等待时间一直阻塞,直到超时

  1. package org.springframework.integration.util;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.RejectedExecutionException;
  4. import java.util.concurrent.RejectedExecutionHandler;
  5. import java.util.concurrent.ThreadPoolExecutor;
  6. import java.util.concurrent.TimeUnit;
  7. import org.apache.commons.logging.Log;
  8. import org.apache.commons.logging.LogFactory;
  9. public class CallerBlocksPolicy implements RejectedExecutionHandler {
  10. private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);
  11. private final long maxWait;
  12. /**
  13. * @param maxWait The maximum time to wait for a queue slot to be
  14. * available, in milliseconds.
  15. */
  16. public CallerBlocksPolicy(long maxWait) {
  17. this.maxWait = maxWait;
  18. }
  19. @Override
  20. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  21. if (!executor.isShutdown()) {
  22. try {
  23. BlockingQueue<Runnable> queue = executor.getQueue();
  24. if (logger.isDebugEnabled()) {
  25. logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");
  26. }
  27. if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {
  28. throw new RejectedExecutionException("Max wait time expired to queue task");
  29. }
  30. if (logger.isDebugEnabled()) {
  31. logger.debug("Task execution queued");
  32. }
  33. }
  34. catch (InterruptedException e) {
  35. Thread.currentThread().interrupt();
  36. throw new RejectedExecutionException("Interrupted", e);
  37. }
  38. }
  39. else {
  40. throw new RejectedExecutionException("Executor has been shut down");
  41. }
  42. }
  43. }

定义好之后如何使用呢?光定义没用的呀,一定要用到线程池中呀,可以通过下面的方式自定义线程池,指定拒绝策略。

  1. BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
  2. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  3. 10, 100, 10, TimeUnit.SECONDS, workQueue, new CallerBlocksPolicy());

execute和submit的区别?

在前面的讲解中,我们执行任务是用的execute方法,除了execute方法,还有一个submit方法也可以执行我们提交的任务。

这两个方法有什么区别呢?分别适用于在什么场景下呢?我们来做一个简单的分析。

execute适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了

  1. public class ThreadPool {
  2. public static void main(String[] args) {
  3. ExecutorService pool = Executors.newFixedThreadPool(10);
  4. pool.execute(() -> {
  5. System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
  6. });
  7. }
  8. }

submit方法适用于需要关注返回值的场景,submit方法的定义如下:

  1. public interface ExecutorService extends Executor {
  2.   ...
  3.   <T> Future<T> submit(Callable<T> task);
  4.   <T> Future<T> submit(Runnable task, T result);
  5.   Future<?> submit(Runnable task);
  6.   ...
  7. }

其子类AbstractExecutorService实现了submit方法,可以看到无论参数是Callable还是Runnable,最终都会被封装成RunnableFuture,然后再调用execute执行。

  1. /**
  2. * @throws RejectedExecutionException {@inheritDoc}
  3. * @throws NullPointerException {@inheritDoc}
  4. */
  5. public Future<?> submit(Runnable task) {
  6. if (task == null) throw new NullPointerException();
  7. RunnableFuture<Void> ftask = newTaskFor(task, null);
  8. execute(ftask);
  9. return ftask;
  10. }
  11. /**
  12. * @throws RejectedExecutionException {@inheritDoc}
  13. * @throws NullPointerException {@inheritDoc}
  14. */
  15. public <T> Future<T> submit(Runnable task, T result) {
  16. if (task == null) throw new NullPointerException();
  17. RunnableFuture<T> ftask = newTaskFor(task, result);
  18. execute(ftask);
  19. return ftask;
  20. }
  21. /**
  22. * @throws RejectedExecutionException {@inheritDoc}
  23. * @throws NullPointerException {@inheritDoc}
  24. */
  25. public <T> Future<T> submit(Callable<T> task) {
  26. if (task == null) throw new NullPointerException();
  27. RunnableFuture<T> ftask = newTaskFor(task);
  28. execute(ftask);
  29. return ftask;
  30. }

下面我们来看看这三个方法分别如何去使用:

submit(Callable task);

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(10);
  4. Future<String> future = pool.submit(new Callable<String>() {
  5. @Override
  6. public String call() throws Exception {
  7. return "Hello";
  8. }
  9. });
  10. String result = future.get();
  11. System.out.println(result);
  12. }
  13. }

submit(Runnable task, T result);

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(10);
  4. Data data = new Data();
  5. Future<Data> future = pool.submit(new MyRunnable(data), data);
  6. String result = future.get().getName();
  7. System.out.println(result);
  8. }
  9. }
  10. class Data {
  11. String name;
  12. public String getName() {
  13. return name;
  14. }
  15. public void setName(String name) {
  16. this.name = name;
  17. }
  18. }
  19. class MyRunnable implements Runnable {
  20. private Data data;
  21. public MyRunnable(Data data) {
  22. this.data = data;
  23. }
  24. @Override
  25. public void run() {
  26. data.setName("yinjihuan");
  27. }
  28. }

Future<?> submit(Runnable task);
直接submit一个Runnable是拿不到返回值的,返回值就是null.

五种线程池的使用场景

  • newSingleThreadExecutor:一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。

  • newFixedThreadPool:一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。

  • newCachedThreadPool:一个可以无限扩大的线程池,比较适合处理执行时间比较小的任务。

  • newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。

  • newWorkStealingPool:一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。

线程池的关闭

关闭线程池可以调用shutdownNow和shutdown两个方法来实现

shutdownNow:对正在执行的任务全部发出interrupt(),停止执行,对还未开始执行的任务全部取消,并且返回还没开始的任务列表

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(1);
  4. for (int i = 0; i < 5; i++) {
  5. System.err.println(i);
  6. pool.execute(() -> {
  7. try {
  8. Thread.sleep(30000);
  9. System.out.println("--");
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. });
  14. }
  15. Thread.sleep(1000);
  16. List<Runnable> runs = pool.shutdownNow();
  17. }
  18. }

上面的代码模拟了立即取消的场景,往线程池里添加5个线程任务,然后sleep一段时间,线程池只有一个线程,如果此时调用shutdownNow后应该需要中断一个正在执行的任务和返回4个还未执行的任务,控制台输出下面的内容:

  1. 0
  2. 1
  3. 2
  4. 3
  5. 4
  6. [fs.ThreadPool$$Lambda$1/990368553@682a0b20,
  7. fs.ThreadPool$$Lambda$1/990368553@682a0b20,
  8. fs.ThreadPool$$Lambda$1/990368553@682a0b20,
  9. fs.ThreadPool$$Lambda$1/990368553@682a0b20]
  10. java.lang.InterruptedException: sleep interrupted
  11. at java.lang.Thread.sleep(Native Method)
  12. at fs.ThreadPool.lambda$0(ThreadPool.java:15)
  13. at fs.ThreadPool$$Lambda$1/990368553.run(Unknown Source)
  14. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  15. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  16. at java.lang.Thread.run(Thread.java:745)

shutdown:当我们调用shutdown后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(1);
  4. for (int i = 0; i < 5; i++) {
  5. System.err.println(i);
  6. pool.execute(() -> {
  7. try {
  8. Thread.sleep(30000);
  9. System.out.println("--");
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. });
  14. }
  15. Thread.sleep(1000);
  16. pool.shutdown();
  17. pool.execute(() -> {
  18. try {
  19. Thread.sleep(30000);
  20. System.out.println("--");
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. }
  26. }

上面的代码模拟了正在运行的状态,然后调用shutdown,接着再往里面添加任务,肯定是拒绝添加的,请看输出结果:

  1. 0
  2. 1
  3. 2
  4. 3
  5. 4
  6. Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task fs.ThreadPool$$Lambda$2/1747585824@3d075dc0 rejected from java.util.concurrent.[email protected][Shutting down, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]
  7. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
  8. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
  9. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
  10. at fs.ThreadPool.main(ThreadPool.java:24)

还有一些业务场景下需要知道线程池中的任务是否全部执行完成,当我们关闭线程池之后,可以用isTerminated来判断所有的线程是否执行完成,千万不要用isShutdown,isShutdown只是返回你是否调用过shutdown的结果。

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(1);
  4. for (int i = 0; i < 5; i++) {
  5. System.err.println(i);
  6. pool.execute(() -> {
  7. try {
  8. Thread.sleep(3000);
  9. System.out.println("--");
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. });
  14. }
  15. Thread.sleep(1000);
  16. pool.shutdown();
  17. while(true){
  18. if(pool.isTerminated()){
  19. System.out.println("所有的子线程都结束了!");
  20. break;
  21. }
  22. Thread.sleep(1000);
  23. }
  24. }
  25. }

自定义线程池

在实际的使用过程中,大部分我们都是用Executors去创建线程池直接使用,如果有一些其他的需求,比如指定线程池的拒绝策略,阻塞队列的类型,线程名称的前缀等等,我们可以采用自定义线程池的方式来解决。

如果只是简单的想要改变线程名称的前缀的话可以自定义ThreadFactory来实现,在Executors.new…中有一个ThreadFactory的参数,如果没有指定则用的是DefaultThreadFactory。

自定义线程池核心在于创建一个ThreadPoolExecutor对象,指定参数

下面我们看下ThreadPoolExecutor构造函数的定义:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) ;
  • corePoolSize
    线程池大小,决定着新提交的任务是新开线程去执行还是放到任务队列中,也是线程池的最最核心的参数。一般线程池开始时是没有线程的,只有当任务来了并且线程数量小于corePoolSize才会创建线程。
  • maximumPoolSize
    最大线程数,线程池能创建的最大线程数量。
  • keepAliveTime
    在线程数量超过corePoolSize后,多余空闲线程的最大存活时间。
  • unit
    时间单位
  • workQueue
    存放来不及处理的任务的队列,是一个BlockingQueue。
  • threadFactory
    生产线程的工厂类,可以定义线程名,优先级等。
  • handler
    拒绝策略,当任务来不及处理的时候,如何处理, 前面有讲解。

了解上面的参数信息后我们就可以定义自己的线程池了,我这边用ArrayBlockingQueue替换了LinkedBlockingQueue,指定了队列的大小,当任务超出队列大小之后使用CallerRunsPolicy拒绝策略处理。

这样做的好处是严格控制了队列的大小,不会出现一直往里面添加任务的情况,有的时候任务处理的比较慢,任务数量过多会占用大量内存,导致内存溢出。

当然你也可以在提交到线程池的入口进行控制,比如用CountDownLatch, Semaphore等。

  1. /**
  2. * 自定义线程池<br>
  3. * 默认的newFixedThreadPool里的LinkedBlockingQueue是一个无边界队列,如果不断的往里加任务,最终会导致内存的不可控<br>
  4. * 增加了有边界的队列,使用了CallerRunsPolicy拒绝策略
  5. * @author yinjihuan
  6. *
  7. */
  8. public class FangjiaThreadPoolExecutor {
  9. private static ExecutorService executorService = newFixedThreadPool(50);
  10. private static ExecutorService newFixedThreadPool(int nThreads) {
  11. return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
  12. new ArrayBlockingQueue<Runnable>(10000), new DefaultThreadFactory(), new CallerRunsPolicy());
  13. }
  14. public static void execute(Runnable command) {
  15. executorService.execute(command);
  16. }
  17. public static void shutdown() {
  18. executorService.shutdown();
  19. }
  20. static class DefaultThreadFactory implements ThreadFactory {
  21. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  22. private final ThreadGroup group;
  23. private final AtomicInteger threadNumber = new AtomicInteger(1);
  24. private final String namePrefix;
  25. DefaultThreadFactory() {
  26. SecurityManager s = System.getSecurityManager();
  27. group = (s != null) ? s.getThreadGroup() :
  28. Thread.currentThread().getThreadGroup();
  29. namePrefix = "FSH-pool-" +
  30. poolNumber.getAndIncrement() +
  31. "-thread-";
  32. }
  33. public Thread newThread(Runnable r) {
  34. Thread t = new Thread(group, r,
  35. namePrefix + threadNumber.getAndIncrement(),
  36. 0);
  37. if (t.isDaemon())
  38. t.setDaemon(false);
  39. if (t.getPriority() != Thread.NORM_PRIORITY)
  40. t.setPriority(Thread.NORM_PRIORITY);
  41. return t;
  42. }
  43. }
  44. }

以上是关于线程池的问题的主要内容,如果未能解决你的问题,请参考以下文章

Java——线程池

Java线程池详解

Java线程池详解

Java 线程池详解

Java线程池详解

线程池的使用场景和代码实现!