线程池浅学
Posted z啵唧啵唧
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池浅学相关的知识,希望对你有一定的参考价值。
文章目录
线程池
自定义线程池
- 自定义线程池(没有等待时限,当我们的任务执行完毕之后线程还在继续等待)
package com.zb.juc.test;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description:
* @Author:啵啵啵啵啵啵唧~~~
* @Date:2022/4/23
*/
@Slf4j(topic = "c.TestPool")
public class TestPool
public static void main(String[] args)
ThreadPool threadPool=new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10);
for (int i = 0; i <5 ; i++)
int j = i;
threadPool.execute(()->
log.debug("",j);
);
@Slf4j(topic = "c.ThreadPool")
class ThreadPool
/**
* 任务队列
*/
private BlockingQueue<Runnable> taskQueue;
/**
* 线程集合
*/
private HashSet<Worker> workers = new HashSet<>();
/**
* 核心线程数
*/
private int coreSize;
/**
* 获取任务的超时时间,过了超时间还没有任务可以让线程停止
*/
private long timeout;
/**
* 时间单位
*/
private TimeUnit timeUnit;
/**
* 初始化
* @param coreSize
* @param timeout
* @param timeUnit
* @param queueCapacity
*/
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity)
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
/**
* 用来执行任务
* @param task
*/
public void execute(Runnable task)
synchronized (workers)
//当任务数量没有超过核心线程数的时候,直接交给worker执行
if (workers.size()<coreSize)
Worker worker = new Worker(task);
log.debug("新增worker,",worker,task);
workers.add(worker);
worker.start();
else
//如果任务数超过coreSize时,加入任务队列暂存
log.debug("加入任务队列",task);
taskQueue.put(task);
/**
* 封装线程对象
*/
class Worker extends Thread
private Runnable task;
public Worker(Runnable task)
this.task=task;
@Override
public void run()
//执行任务
//1、当task不为空,执行任务
//当task执行完毕,再接着从任务队列中获取任务并执行
while(task!=null||(task = taskQueue.take())!=null)
// while(task!=null||(task = taskQueue.poll(timeout,timeUnit))!=null)
try
log.debug("正在执行",task);
task.run();
catch (Exception e)
e.printStackTrace();
finally
//执行完毕之后task就可以置为null
task=null;
//一旦退出了循环因该
synchronized (workers)
log.debug("worker被移除",this);
workers.remove(this);
/**
* 定义一个阻塞队列
* @param <T>
*/
class BlockingQueue<T>
/**
* 1、任务队列
*/
private Deque<T> queue = new ArrayDeque<>();
/**
* 2、锁
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 3、当任务队列当中没有任务的时候,消费者就进入等待状态,
* 这时候就需要一个消费者的条件变量
*/
private Condition emptyWaitSet = lock.newCondition();
/**
* 4、生产者也就需要一个条件变量
*/
private Condition fullWaitSet = lock.newCondition();
/**
* 5、阻塞队列的容量上限
*/
private int capcity;
/**
* 没有时间限制的阻塞获取
* @return
*/
public T take()
lock.lock();
try
while(queue.isEmpty())
try
//当任务队列为空,消费者就没有任务可以消费,那么就进入等待的状态
emptyWaitSet.await();
catch (InterruptedException e)
e.printStackTrace();
//此时任务队列不为空,我们取处任务队列当中队头的任务返回
T t = queue.removeFirst();
//当从任务队列当中取处一个任务的时候,任务队列就有空位了,就可以唤醒因为队列满了而等待的生产者
fullWaitSet.signal();
return t;
finally
lock.unlock();
/**
* 阻塞添加
* @param element
*/
public void put(T element)
lock.lock();
try
while(queue.size()==capcity)
//判断队列是否已满,满了的话生产者进入等待
try
fullWaitSet.await();
catch (InterruptedException e)
e.printStackTrace();
//当有空位的时候,将新的任务放到队列的尾部
queue.addLast(element);
//添加完新的元素之后,需要唤醒等待当中的消费者队列,因为有新的任务进队列了
emptyWaitSet.signal();
finally
lock.unlock();
/**
* 有时间限制的阻塞获取
* @param timeout
* @param unit
* @return
*/
public T poll(long timeout , TimeUnit unit)
lock.lock();
try
//将timeout时间转换成纳秒
long nanos = unit.toNanos(timeout);
while(queue.isEmpty())
try
//存在虚假唤醒的情况,所以可以拿到awaitNanos的返回值就是剩余的时间
if (nanos<=0)
return null;
nanos = emptyWaitSet.awaitNanos(nanos);
catch (InterruptedException e)
e.printStackTrace();
//此时任务队列不为空,我们取处任务队列当中队头的任务返回
T t = queue.removeFirst();
//当从任务队列当中取处一个任务的时候,任务队列就有空位了,就可以唤醒因为队列满了而等待的生产者
fullWaitSet.signal();
return t;
finally
lock.unlock();
/**
* 获取阻塞队列的大小
* @return
*/
public int size()
lock.lock();
try
return queue.size();
finally
lock.unlock();
public BlockingQueue(int capacity)
this.capcity = capacity;
- 如果想要实现有时限的等待,只需要调用poll方法进行实现即可,因为在poll方法,我们添加了这个执行时间的控制机制
@Override
public void run()
//执行任务
//1、当task不为空,执行任务
//当task执行完毕,再接着从任务队列中获取任务并执行
// while(task!=null||(task = taskQueue.take())!=null)
while(task!=null||(task = taskQueue.poll(timeout,timeUnit))!=null)
try
log.debug("正在执行",task);
task.run();
catch (Exception e)
e.printStackTrace();
finally
//执行完毕之后task就可以置为null
task=null;
//一旦退出了循环因该
synchronized (workers)
log.debug("worker被移除",this);
workers.remove(this);
- 当任务数量大于我们的任务队列的容量的时候,需要我们自定义添加拒绝策略
JDK提供的线程池
线程池的状态
- ThreadPoolExecutor 使用int的高三位来表示线程池的状态,低29位来表示线程的数量
状态名 | 高三位 | 接受新任务吗? | 处理阻塞队列任务吗? | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会再接受新的任务,但是会将阻塞队列当中的剩余任务处理完毕 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全部执行完毕,活动线程为0即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
- 从数字上进行比较 TERMINATED>TIDYING>STOP>SHUTDOWN>RUNNING ==注意:==为啥running是111还是最小呢,因为状态用的是int的高三位,有符号整数,最高位为1表示负数,所以running是最小的。
线程池的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 核心线程数(最多保留的线程数)
- maximumPoolSize 最大线程数
- keepAliveTime 生存时间 ->针对救急线程 救急线程数就等于最大线程数减去核心线程数
- unit 时间单位 ->针对救急线程
- workQueue 阻塞队列
- threadFactory 线程工厂 可以为线程创建起名 这样可以有效避免线程自己起名没有啥规律
- handler 这个就是拒绝策略
工作方式
- 是这样的,就是正常的先分配线程嘛,每提交一个任务的时候就分配一个线程去执行他,当线程数目达到了核心线程的数之后,再提交的线程就会添加到这个阻塞对列当中,当这个阻塞队列被添加满了的时候,不会先去立即执行这个拒绝策略,而是启动这个紧急线程来直接执行这个新添加进来的任务,当这个紧急线程都被用完了之后,实在是没有办法执行这个任务了,那么就会采用这个拒绝策略。咱就是说刚好等到这个紧急线程的开启的任务实在是比较幸运的。来得早不如来得巧。
- 我们要知道的是只有有界对列才会创建(最大线程数-核心线程数)个紧急线程,如果对列不是一个有界的对列不存在核心线程数这个说法
JDK线程池的拒绝策略
- AbortPolicy:让调用者抛出RejectedExecutionException异常,这是默认的拒绝策略
- CallerRunsPolicy:让调用者自行运行任务
- DiscardPolicy:放弃本次任务
- DiscardOldestPolicy放弃队列当中最早的任务,本任务取而代之
一些常见框架实现拒绝策略的方式
- Dubbo的实现,在抛出RejectedExecutionException异常之前会记录日志,并dump线程栈信息,方便定位问题
- Neety的实现,是创建一个新的线程执行任务
- ActiveMQ的实现,带超时等待60s尝试放入对列,类似我们之前自定义的拒绝策略
- PinPoint的实现,它使用了一个拒绝策略链,会逐一尝试策略链中的每种拒绝策略
EXecutors
固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads)
//从这里我们可以看出核心线程数等于最大线程数,所以固定大小的这种创建线程池的方式没有核心线程
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
特点
-
核心线程数等于最大线程数,不能存在紧急线程
-
因为都是核心线程,任务执行完毕之后依然不会结束
-
说明一下这个打印名称:由线程工厂实现的
DefaultThreadFactory()
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
- 当然我们也可以在实现线程池的到时候自己是实现一个线程工程,来制定我们子的命名规则
- 上面的代码可以改造成这个样子,再创建线程池的时候传入自己实现的一个线程工厂
@Slf4j(topic = "c.Test18")
public class Test18
public static void main(String[] args)
ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory()
//创建一个原子类的整形用来设置线程名字的编号
private AtomicInteger t = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r)
return new Thread(r,"myPool_t"+t.getAndIncrement());
);
pool.execute(()->
log.debug("1");
);
pool.execute(()->
log.debug("2");
);
pool.execute(()->
log.debug("3");
);
- 发现名字可以改过来啦
带缓冲功能的线程池
public static ExecutorService newCachedThreadPool()
//从这个构造方法可以看出,带缓冲的这个线程池它没有核心线程,全部都是紧急线程,只要在整数的最大范围内部创建的都是紧急线程
//通过他的这个时间上可以看出紧急线程的时间为60s
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
//他这个对列还是蛮特殊的
new SynchronousQueue<Runnable>());
- SynchronousQueue()带缓冲的线程池他的这个对列是没有容量的采用的是一手交钱一手交货的模式,也就是说只有当存在线程想要从这个队列当中拿任务的时候我们才往这个对列中添加任务,是一对一的。
评价
- 整个线程池表现为线程数会根据任务量不断地增长,没有上限,当任务执行完毕,空闲一分钟后释放线程。
- 适合任务数量比较密集,但是每个任务执行时间比较短的情况。
单线程线程池
public static ExecutorService newSingleThreadExecutor()
return new FinalizableDelegatedExecutorService
//核心线程和总线程数都是1
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
//给的是一个无界的阻塞对列
new LinkedBlockingQueue<Runnable>()));
- 使用场景:希望多个线程排队执行。线程数固定为1,任务多余1时,会放入无界队列当中,任务对列执行完毕,这唯一的线程也不会被释放。
区别:
- 自己创建一个单线程串行的执行任务,如果任务执行失败而终止的化没有任何补救措施,而像这个单线程的线程池当任务执行失败之后还会创建新的一个线程来执行任务,保证线程池的正常工作
- 单线程池应用了装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有的方法4
- 上面这个固定大小的线程池,当我们指定线程数为1的时候,类似于这个单线程的线程池,但是这个固定大小的线程池返回的是ThreadPoolExecutor对象,可以在强转之后调用setCorePoolSize等方法进行修改。
提交任务
- 上面介绍到execute方法可以用来执行任务
void execute(Runnable command);
- 提交任务tasks,用返回值Future获得执行结果
<T> Future<T> submit(Callable<T> task);
@Slf4j(topic = "c.Test19")
public class Test19
public static void main(String[] args) throws ExecutionException, InterruptedException
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future = pool.submit(new Callable<String>()
@Override
public 线程池浅谈