12-线程池

Posted mingmingn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了12-线程池相关的知识,希望对你有一定的参考价值。

线程池定义

线程池顾名思义是事先创建若干可执行的线程放入一个池(容器)中,需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而放回池中,从而减少创建和销毁对象的开销.

线程池优点

降低资源消耗
提高响应速度
提高线程的可管理性

如何设计线程池

简陋版

设计

1.首先要有一个池子(容器)
2.池子具备开启/初始化/关闭功能
3.获取线程
4.归还线程

示意图

技术图片

 

 

 问题

1.在获取线程的时候,线程池没有线程可以获取的情况怎么处理?
2.初始化线程池时候,初始化多少个线程才算合适?
3.对于客户端使用不够方便,使用之后还要归还线程?不好使用

改进版

设计

1.添加缓冲数组

2.内部封装获取线程和归还线程,用户只要提交任务就可以异步获取结果.

示意图

技术图片

 

 

 问题

1.任务队列多长才好
2.队列满了之后怎么办?应该采取什么策略
3.线程池初始化,初始化多少线程才合适?

线程池核心参数

 1 /**
 2  * Creates a new {@code ThreadPoolExecutor} with the given initial
 3  * parameters.
 4  *
 5  * @param corePoolSize 最大核心线程数
 6  * @param maximumPoolSize 最大线程数量
 7  * @param keepAliveTime 线程空闲后的存活时间
 8  * @param unit 时间单位
 9  * @param workQueue 用于存放任务的阻塞队列
10  * @param threadFactory 线程工厂类
11  * @param handler 当队列和最大线程池都满了之后的饱和策略
12  */
13 public ThreadPoolExecutor(int corePoolSize,
14                           int maximumPoolSize,
15                           long keepAliveTime,
16                           TimeUnit unit,
17                           BlockingQueue<Runnable> workQueue,
18                           ThreadFactory threadFactory,
19                           RejectedExecutionHandler handler) {...}

线程池的处理流程

技术图片

 

 

线程池可选择的阻塞队列

什么是阻塞队列

当阻塞队列为空时,从队列中获取元素的操作将会被阻塞。
当阻塞队列为满时,从队列里添加元素的操作将会被阻塞。

队列类型

1.无界队列
2.有界队列
3.同步移交队列

  SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是 不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并     不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

代码

 1 import org.junit.Test;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.LinkedBlockingQueue;
 5 import java.util.concurrent.SynchronousQueue;
 6 
 7 /**
 8  * @description:阻塞队列的使用
 9  */
10 public class QueueTest {
11 
12     /**
13      * 有界队列,队列容量为10
14      * @throws InterruptedException
15      */
16     @Test
17     public void arrayBlockingQueue() throws InterruptedException {
18         ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
19 
20         for (int i = 0; i < 20; i++) {
21             queue.put(i);
22             System.out.println("向队列中添加值: " + i);
23         }
24         //向队列中添加值: 0
25         //向队列中添加值: 1
26         //向队列中添加值: 2
27         //向队列中添加值: 3
28         //向队列中添加值: 4
29         //向队列中添加值: 5
30         //向队列中添加值: 6
31         //向队列中添加值: 7
32         //向队列中添加值: 8
33         //向队列中添加值: 9
34     }
35 
36     /**
37      * 基于链表的有界/无界阻塞队列
38      * @throws InterruptedException
39      */
40     @Test
41     public void linkedBlockingQueue() throws InterruptedException {
42 //        LinkedBlockingQueue queue = new LinkedBlockingQueue<Integer>(10);
43         //不加队列长度.默认最大长度,无界队列
44         LinkedBlockingQueue queue = new LinkedBlockingQueue<Integer>();
45 
46         for (int i = 0; i < 20; i++) {
47             queue.put(i);
48             System.out.println("向队列中添加值: " + i);
49         }
50         //向队列中添加值: 0
51         //向队列中添加值: 1
52         //向队列中添加值: 2
53         //向队列中添加值: 3
54         //向队列中添加值: 4
55         //向队列中添加值: 5
56         //向队列中添加值: 6
57         //向队列中添加值: 7
58         //向队列中添加值: 8
59         //向队列中添加值: 9
60         //向队列中添加值: 10
61         //向队列中添加值: 11
62         //向队列中添加值: 12
63         //向队列中添加值: 13
64         //向队列中添加值: 14
65         //向队列中添加值: 15
66         //向队列中添加值: 16
67         //向队列中添加值: 17
68         //向队列中添加值: 18
69         //向队列中添加值: 19
70     }
71 
72     @Test
73     public void synchronousQueue(){
74         /**
75          * 同步移交队列
76          */
77         SynchronousQueue<Integer> queue = new SynchronousQueue<>();
78 
79         //插入值
80         new Thread(() -> {
81             try {
82                 queue.put(1);
83                 System.out.println("插入成功");
84             } catch (InterruptedException e) {
85                 e.printStackTrace();
86             }
87         }).start();
88 
89         //删除值
90         new Thread(() -> {
91             try {
92                 queue.take();
93                 System.out.println("删除成功");
94             } catch (InterruptedException e) {
95                 e.printStackTrace();
96             }
97         }).start();
98     }
99 }

线程池可选择的饱和策略

AbortPolicy终止策略(默认)

 1     /**
 2      * A handler for rejected tasks that throws a
 3      * {@code RejectedExecutionException}.
 4      */
 5     public static class AbortPolicy implements RejectedExecutionHandler {
 6         /**
 7          * Creates an {@code AbortPolicy}.
 8          */
 9         public AbortPolicy() { }
10 
11         /**
12          * Always throws RejectedExecutionException.
13          *
14          * @param r the runnable task requested to be executed
15          * @param e the executor attempting to execute this task
16          * @throws RejectedExecutionException always
17          */
18         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
19             throw new RejectedExecutionException("Task " + r.toString() +
20                                                  " rejected from " +
21                                                  e.toString());
22         }
23     }

DiscardPolicy抛弃策略

 1     /**
 2      * A handler for rejected tasks that silently discards the
 3      * rejected task.
 4      */
 5     public static class DiscardPolicy implements RejectedExecutionHandler {
 6         /**
 7          * Creates a {@code DiscardPolicy}.
 8          */
 9         public DiscardPolicy() { }
10 
11         /**
12          * Does nothing, which has the effect of discarding task r.
13          *
14          * @param r the runnable task requested to be executed
15          * @param e the executor attempting to execute this task
16          */
17         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
18         }
19     }

DiscardOldestPolicy抛弃旧任务策略

 1     /**
 2      * A handler for rejected tasks that discards the oldest unhandled
 3      * request and then retries {@code execute}, unless the executor
 4      * is shut down, in which case the task is discarded.
 5      */
 6     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 7         /**
 8          * Creates a {@code DiscardOldestPolicy} for the given executor.
 9          */
10         public DiscardOldestPolicy() { }
11 
12         /**
13          * Obtains and ignores the next task that the executor
14          * would otherwise execute, if one is immediately available,
15          * and then retries execution of task r, unless the executor
16          * is shut down, in which case task r is instead discarded.
17          *
18          * @param r the runnable task requested to be executed
19          * @param e the executor attempting to execute this task
20          */
21         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
22             if (!e.isShutdown()) {
23                 e.getQueue().poll();
24                 e.execute(r);
25             }
26         }
27     }

CallerRunsPolicy调用者运行策略

 1     /**
 2      * A handler for rejected tasks that runs the rejected task
 3      * directly in the calling thread of the {@code execute} method,
 4      * unless the executor has been shut down, in which case the task
 5      * is discarded.
 6      */
 7     public static class CallerRunsPolicy implements RejectedExecutionHandler {
 8         /**
 9          * Creates a {@code CallerRunsPolicy}.
10          */
11         public CallerRunsPolicy() { }
12 
13         /**
14          * Executes task r in the caller‘s thread, unless the executor
15          * has been shut down, in which case the task is discarded.
16          *
17          * @param r the runnable task requested to be executed
18          * @param e the executor attempting to execute this task
19          */
20         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
21             if (!e.isShutdown()) {
22                 r.run();
23             }
24         }
25     }

线程池执行示意图

技术图片

 

 

 常用线程池

Executors.newCacheThreadPool()

可缓存线程池,先查看池中有没有以前建立的线程,如果有,就直接使用。如果没有,就建一个新的线程加入池中,缓存型池子通常用于执行一些生存期很短的异步型任务

Executors.newFixedThreadPool(int n)

创建一个可重用固定个数的线程池,以共享的无界队列方式来运行这些线程

Executors.newScheduledThreadPool(int n)

创建一个定长线程池,支持定时及周期性任务执行

Executors.newSingleThreadExecutor()

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

向线程池提交任务

 1 import org.junit.Test;
 2 
 3 import java.util.concurrent.ExecutionException;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.Future;
 7 
 8 public class RunTest {
 9 
10     @Test
11     public void submitTest()
12             throws ExecutionException, InterruptedException {
13 
14         // 创建线程池
15         ExecutorService threadPool =
16                 Executors.newCachedThreadPool();
17 
18         /**
19          * 利用submit方法提交任务,接收任务的返回结果
20          */
21         Future<Integer> future = threadPool.submit(() -> {
22             Thread.sleep(1000L * 10);
23 
24             return 2 * 5;
25         });
26 
27         /**
28          * 阻塞方法,直到任务有返回值后,才向下执行
29          */
30         Integer num = future.get();
31 
32         System.out.println("执行结果:" + num);
33     }
34 
35     @Test
36     public void executeTest() throws InterruptedException {
37         // 创建线程池
38         ExecutorService threadPool =
39                 Executors.newCachedThreadPool();
40 
41         /**
42          * 利用execute方法提交任务,没有返回结果
43          */
44         threadPool.execute(() -> {
45             try {
46                 Thread.sleep(1000L * 10);
47             } catch (InterruptedException e) {
48                 e.printStackTrace();
49             }
50 
51             Integer num = 2 * 5;
52             System.out.println("执行结果:" + num);
53         });
54 
55 
56 
57         Thread.sleep(1000L * 1000);
58     }
59 
60 }

线程池的状态

技术图片

 

以上是关于12-线程池的主要内容,如果未能解决你的问题,请参考以下文章

Java——线程池

Motan在服务provider端用于处理request的线程池

Java线程池详解

Java线程池详解

Java 线程池详解

线程池-实现一个取消选项