线程池
Posted pjqq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池相关的知识,希望对你有一定的参考价值。
要求:
JavaSE阶段性测试1:手写线程池需求
需求背景:由于频繁的自行创建和销毁线程非常的消耗资 源,并且难以控制线程的并发数量,所以项目组急需编写 一个线程池功能,能够自行管理线程的生命周期,并且根 据繁忙线程池程度对池中线程进行动态扩容。
现在项目经理抽象出线程池对象中包含有几个主要特性
1. 可以根据任务数量进行线程数量扩容,在空闲时线程
池中只保存coreSize数量的线程,而当coreSize个线程已 经全部在工作时,这个时候如果再添加任务进来,会新建 线程,接受任务,直到池中的线程数量达到maxSize时, 不在新建线程
2. 当线程数量达到maxSize时,并且所有线程都处于工
作状态,这时如果还有任务添加过来,则将其保存在一个 队列中(可以自定义队列,也可以自行调研并使用jdk中已 有的Queue子类)
3. 如果队列中的任务数量已经达到了最大值(队列满
了),这个时候又有任务添加过来,则执行丢弃策略(可 以什么都不做,也可以仍出异常,也可以让主线程中直接 调用run处理掉等等,最好使用策略模式而不是if-else)
4. 定义线程的空闲时间,如果线程池处理完了任务,线 程空闲了,则根据已定义新增线程的空闲时间,如果时间达到了,则回收线程直到线程数量达到coreSize数量为 止。(注意回收等操作的原子性,防止数据不一致)
思路,允许先在一个类中完成所有功能,再对代码进行面 向对象重构 也可以直接研读jdk线程池源码,参照写一份也可以(不建 议) 编写测试类,测试你的线程池是否功能正常
package com.test4; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; /** * @Author:pjq * @Date: 2019/9/23 9:30 */ public class SimpleThreadPool extends Thread /** * 当前线程队列的大小 */ private int size; /** * 任务队列大小 */ private final int queueSize; /** * 默认任务队列大小 */ private final static int TASK_QUEUE_SIZE = 200; /** * 任务队列 正在执行任务的线程 */ private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>(); /** * 线程队列,可以取出空闲线程 */ private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>(); /** * 线程ID */ private static volatile int seq = 0; /** * 线程池是否被销毁 */ private volatile boolean destroyed = false; private int coreSize; private int maxSize; private int active; //队列满了丢异常 private DiscardPolicy discardPolicy = new MyAbortPolicy(); class MyAbortPolicy implements DiscardPolicy @Override public void discard(String name) throws RuntimeException throw new RuntimeException("超出任务...") ; interface DiscardPolicy void discard(String name) throws RuntimeException; public SimpleThreadPool() this(4, 8, 12, TASK_QUEUE_SIZE); public SimpleThreadPool(int coreSize, int active, int maxSize, int queue_size) this.queueSize = queue_size; this.coreSize = coreSize; this.maxSize = maxSize; this.active = active; init(); /** * 首先创建最小数量的线程池 */ private void init() for (int i = 0; i < coreSize; i++) createWorkTask(); this.size = coreSize; this.start(); /** * 创建空闲线程 加入线程池 */ private void createWorkTask() WorkerTask task = new WorkerTask("pool_" + (seq++)); task.start(); THREAD_QUEUE.add(task);//添加任务 /** * 向外提供方法 提交任务,如果任务大小超过线程池大小 200 则直接抛弃 */ public void submit(Runnable runnable) synchronized (TASK_QUEUE) if (TASK_QUEUE.size() > queueSize) discardPolicy.discard("抛弃任务。。"); TASK_QUEUE.addLast(runnable); TASK_QUEUE.notifyAll(); /** * 监控线程池,进行动态的扩容和缩小 */ @Override public void run() while (!destroyed) try TimeUnit.MILLISECONDS.sleep(1000); System.err.printf("pool Min:%d,Active:%d,Max:%d,current:%d,QueueSize:%d\\n", coreSize, active, maxSize, size, TASK_QUEUE.size()); catch (InterruptedException e) e.printStackTrace(); try Thread.sleep(5000); //判断当队列大小大于活动大小 创建任务(第一次扩容) if (TASK_QUEUE.size() > active && size < active) for (int i = size; i < active; i++) createWorkTask(); System.out.println("active 池自动调整为 " + active); size = active; //判断当线程任务大小大于最大线程大小时,创建任务(创建任务) else if (TASK_QUEUE.size() > maxSize && size < maxSize) for (int i = size; i < maxSize; i++) createWorkTask(); System.out.println("max 池自动调整为 " + maxSize); size = maxSize; synchronized (THREAD_QUEUE) if (TASK_QUEUE.isEmpty() && size > active) int release = size - active; for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator(); it.hasNext(); ) if (release < 0) break; WorkerTask task = it.next(); task.interrupt(); it.remove(); release--; size = active; System.out.println("池释放大小为" + size); catch (InterruptedException e) e.printStackTrace(); /** * 任务有四种状态 */ private enum TaskState FREE, RUNNING, BLOCKED, DEAD /** * 内部类 将runnable封装为task执行 */ private static class WorkerTask extends Thread private volatile TaskState taskState = TaskState.FREE; public WorkerTask(String name) super(name); public TaskState getTaskState() return taskState; /** * 运行任务队列中的任务 */ @Override public void run() //循环标记 OUTER: while (this.taskState != TaskState.DEAD) Runnable runnable; synchronized (TASK_QUEUE) while (TASK_QUEUE.isEmpty()) try taskState = TaskState.BLOCKED; TASK_QUEUE.wait(); catch (InterruptedException e) break OUTER; runnable = TASK_QUEUE.removeFirst(); if (runnable != null) taskState = TaskState.RUNNING; runnable.run(); taskState = TaskState.FREE; public static void main(String[] args) throws InterruptedException SimpleThreadPool threadPool = new SimpleThreadPool(); IntStream.range(0, 200).forEach(i -> threadPool.submit(() -> System.out.println("任务" + Thread.currentThread().getName() + " 接收... " + i); try Thread.sleep(1000); catch (InterruptedException e) e.printStackTrace(); System.out.println("任务" + Thread.currentThread().getName() + " 关闭... " + i); )); //threadPool.shutDown(); /* for ( int i = 0; i < 203; i++) threadPool.submit(()-> System.out.println("运行的任务池" + Thread.currentThread().getName()+" "); try Thread.sleep(1000); catch (InterruptedException e) e.printStackTrace(); System.out.println("运行任务" + Thread.currentThread().getName() + "关"); ); // System.out.println("----------------------------"); */
以上是关于线程池的主要内容,如果未能解决你的问题,请参考以下文章