12-线程池
Posted mingmingn
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了12-线程池相关的知识,希望对你有一定的参考价值。
线程池定义
线程池顾名思义是事先创建若干可执行的线程放入一个池(容器)中,需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而放回池中,从而减少创建和销毁对象的开销.
线程池优点
降低资源消耗
提高响应速度
提高线程的可管理性
如何设计线程池
简陋版
设计
1.首先要有一个池子(容器)
2.池子具备开启/初始化/关闭功能
3.获取线程
4.归还线程
示意图
问题
1.在获取线程的时候,线程池没有线程可以获取的情况怎么处理?
2.初始化线程池时候,初始化多少个线程才算合适?
3.对于客户端使用不够方便,使用之后还要归还线程?不好使用
改进版
设计
1.添加缓冲数组
2.内部封装获取线程和归还线程,用户只要提交任务就可以异步获取结果.
示意图
问题
1.任务队列多长才好
2.队列满了之后怎么办?应该采取什么策略
3.线程池初始化,初始化多少线程才合适?
线程池核心参数
1 /** 2 * Creates a new {@code ThreadPoolExecutor} with the given initial 3 * parameters. 4 * 5 * @param corePoolSize 最大核心线程数 6 * @param maximumPoolSize 最大线程数量 7 * @param keepAliveTime 线程空闲后的存活时间 8 * @param unit 时间单位 9 * @param workQueue 用于存放任务的阻塞队列 10 * @param threadFactory 线程工厂类 11 * @param handler 当队列和最大线程池都满了之后的饱和策略 12 */ 13 public ThreadPoolExecutor(int corePoolSize, 14 int maximumPoolSize, 15 long keepAliveTime, 16 TimeUnit unit, 17 BlockingQueue<Runnable> workQueue, 18 ThreadFactory threadFactory, 19 RejectedExecutionHandler handler) {...}
线程池的处理流程
线程池可选择的阻塞队列
什么是阻塞队列
当阻塞队列为空时,从队列中获取元素的操作将会被阻塞。
当阻塞队列为满时,从队列里添加元素的操作将会被阻塞。
队列类型
1.无界队列
2.有界队列
3.同步移交队列
SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是 不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并 不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。
代码
1 import org.junit.Test; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.LinkedBlockingQueue; 5 import java.util.concurrent.SynchronousQueue; 6 7 /** 8 * @description:阻塞队列的使用 9 */ 10 public class QueueTest { 11 12 /** 13 * 有界队列,队列容量为10 14 * @throws InterruptedException 15 */ 16 @Test 17 public void arrayBlockingQueue() throws InterruptedException { 18 ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10); 19 20 for (int i = 0; i < 20; i++) { 21 queue.put(i); 22 System.out.println("向队列中添加值: " + i); 23 } 24 //向队列中添加值: 0 25 //向队列中添加值: 1 26 //向队列中添加值: 2 27 //向队列中添加值: 3 28 //向队列中添加值: 4 29 //向队列中添加值: 5 30 //向队列中添加值: 6 31 //向队列中添加值: 7 32 //向队列中添加值: 8 33 //向队列中添加值: 9 34 } 35 36 /** 37 * 基于链表的有界/无界阻塞队列 38 * @throws InterruptedException 39 */ 40 @Test 41 public void linkedBlockingQueue() throws InterruptedException { 42 // LinkedBlockingQueue queue = new LinkedBlockingQueue<Integer>(10); 43 //不加队列长度.默认最大长度,无界队列 44 LinkedBlockingQueue queue = new LinkedBlockingQueue<Integer>(); 45 46 for (int i = 0; i < 20; i++) { 47 queue.put(i); 48 System.out.println("向队列中添加值: " + i); 49 } 50 //向队列中添加值: 0 51 //向队列中添加值: 1 52 //向队列中添加值: 2 53 //向队列中添加值: 3 54 //向队列中添加值: 4 55 //向队列中添加值: 5 56 //向队列中添加值: 6 57 //向队列中添加值: 7 58 //向队列中添加值: 8 59 //向队列中添加值: 9 60 //向队列中添加值: 10 61 //向队列中添加值: 11 62 //向队列中添加值: 12 63 //向队列中添加值: 13 64 //向队列中添加值: 14 65 //向队列中添加值: 15 66 //向队列中添加值: 16 67 //向队列中添加值: 17 68 //向队列中添加值: 18 69 //向队列中添加值: 19 70 } 71 72 @Test 73 public void synchronousQueue(){ 74 /** 75 * 同步移交队列 76 */ 77 SynchronousQueue<Integer> queue = new SynchronousQueue<>(); 78 79 //插入值 80 new Thread(() -> { 81 try { 82 queue.put(1); 83 System.out.println("插入成功"); 84 } catch (InterruptedException e) { 85 e.printStackTrace(); 86 } 87 }).start(); 88 89 //删除值 90 new Thread(() -> { 91 try { 92 queue.take(); 93 System.out.println("删除成功"); 94 } catch (InterruptedException e) { 95 e.printStackTrace(); 96 } 97 }).start(); 98 } 99 }
线程池可选择的饱和策略
AbortPolicy终止策略(默认)
1 /** 2 * A handler for rejected tasks that throws a 3 * {@code RejectedExecutionException}. 4 */ 5 public static class AbortPolicy implements RejectedExecutionHandler { 6 /** 7 * Creates an {@code AbortPolicy}. 8 */ 9 public AbortPolicy() { } 10 11 /** 12 * Always throws RejectedExecutionException. 13 * 14 * @param r the runnable task requested to be executed 15 * @param e the executor attempting to execute this task 16 * @throws RejectedExecutionException always 17 */ 18 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 19 throw new RejectedExecutionException("Task " + r.toString() + 20 " rejected from " + 21 e.toString()); 22 } 23 }
DiscardPolicy抛弃策略
1 /** 2 * A handler for rejected tasks that silently discards the 3 * rejected task. 4 */ 5 public static class DiscardPolicy implements RejectedExecutionHandler { 6 /** 7 * Creates a {@code DiscardPolicy}. 8 */ 9 public DiscardPolicy() { } 10 11 /** 12 * Does nothing, which has the effect of discarding task r. 13 * 14 * @param r the runnable task requested to be executed 15 * @param e the executor attempting to execute this task 16 */ 17 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 18 } 19 }
DiscardOldestPolicy抛弃旧任务策略
1 /** 2 * A handler for rejected tasks that discards the oldest unhandled 3 * request and then retries {@code execute}, unless the executor 4 * is shut down, in which case the task is discarded. 5 */ 6 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 7 /** 8 * Creates a {@code DiscardOldestPolicy} for the given executor. 9 */ 10 public DiscardOldestPolicy() { } 11 12 /** 13 * Obtains and ignores the next task that the executor 14 * would otherwise execute, if one is immediately available, 15 * and then retries execution of task r, unless the executor 16 * is shut down, in which case task r is instead discarded. 17 * 18 * @param r the runnable task requested to be executed 19 * @param e the executor attempting to execute this task 20 */ 21 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 22 if (!e.isShutdown()) { 23 e.getQueue().poll(); 24 e.execute(r); 25 } 26 } 27 }
CallerRunsPolicy调用者运行策略
1 /** 2 * A handler for rejected tasks that runs the rejected task 3 * directly in the calling thread of the {@code execute} method, 4 * unless the executor has been shut down, in which case the task 5 * is discarded. 6 */ 7 public static class CallerRunsPolicy implements RejectedExecutionHandler { 8 /** 9 * Creates a {@code CallerRunsPolicy}. 10 */ 11 public CallerRunsPolicy() { } 12 13 /** 14 * Executes task r in the caller‘s thread, unless the executor 15 * has been shut down, in which case the task is discarded. 16 * 17 * @param r the runnable task requested to be executed 18 * @param e the executor attempting to execute this task 19 */ 20 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 21 if (!e.isShutdown()) { 22 r.run(); 23 } 24 } 25 }
线程池执行示意图
常用线程池
Executors.newCacheThreadPool()
可缓存线程池,先查看池中有没有以前建立的线程,如果有,就直接使用。如果没有,就建一个新的线程加入池中,缓存型池子通常用于执行一些生存期很短的异步型任务
Executors.newFixedThreadPool(int n)
创建一个可重用固定个数的线程池,以共享的无界队列方式来运行这些线程
Executors.newScheduledThreadPool(int n)
创建一个定长线程池,支持定时及周期性任务执行
Executors.newSingleThreadExecutor()
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
向线程池提交任务
1 import org.junit.Test; 2 3 import java.util.concurrent.ExecutionException; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.Future; 7 8 public class RunTest { 9 10 @Test 11 public void submitTest() 12 throws ExecutionException, InterruptedException { 13 14 // 创建线程池 15 ExecutorService threadPool = 16 Executors.newCachedThreadPool(); 17 18 /** 19 * 利用submit方法提交任务,接收任务的返回结果 20 */ 21 Future<Integer> future = threadPool.submit(() -> { 22 Thread.sleep(1000L * 10); 23 24 return 2 * 5; 25 }); 26 27 /** 28 * 阻塞方法,直到任务有返回值后,才向下执行 29 */ 30 Integer num = future.get(); 31 32 System.out.println("执行结果:" + num); 33 } 34 35 @Test 36 public void executeTest() throws InterruptedException { 37 // 创建线程池 38 ExecutorService threadPool = 39 Executors.newCachedThreadPool(); 40 41 /** 42 * 利用execute方法提交任务,没有返回结果 43 */ 44 threadPool.execute(() -> { 45 try { 46 Thread.sleep(1000L * 10); 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 51 Integer num = 2 * 5; 52 System.out.println("执行结果:" + num); 53 }); 54 55 56 57 Thread.sleep(1000L * 1000); 58 } 59 60 }
线程池的状态
以上是关于12-线程池的主要内容,如果未能解决你的问题,请参考以下文章