JUC学习之共享模型之工具上之线程池浅学

Posted 大忽悠爱忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC学习之共享模型之工具上之线程池浅学相关的知识,希望对你有一定的参考价值。


线程池

1. 自定义线程池


步骤1:自定义拒绝策略接口

package Pool;



@FunctionalInterface // 拒绝策略
interface RejectPolicy<T>

 void reject(BlockingQueue<T> queue, T task);

步骤2:自定义任务队列

package Pool;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

//阻塞队列
//TODO:泛型可以扩展阻塞队列的扩展性
@Slf4j
public class BlockingQueue<T>

    //1.任务队列
    private Deque<T> deque=new ArrayDeque<>();

    //2.锁
    private ReentrantLock lock=new ReentrantLock();

    //3.生产者条件变量
    private Condition fullWaitSet=lock.newCondition();

    //4.消费者条件变量
    private Condition emptyWaitSet=lock.newCondition();

    //5.容量
    private int capcity;

    public BlockingQueue(int capcity)
    
        this.capcity = capcity;
    

    //带超时的阻塞获取----从队列头部获取一个任务
    public T poll(long timeout, TimeUnit timeUnit)
    
        //先上锁
        lock.lock();
       try 
           //将timeout统一转换为纳秒
           long nanos = timeUnit.toNanos(timeout);
           //TODO:队列为空陷入超时等待,否则返回一个任务
           while(deque.isEmpty())
           
               try 
                   //等待超时
                   if(nanos<=0)
                   
                       return null;
                   
                   //返回剩余等待时间
                   //TODO:这里之所以会返回一个剩余时间,是因为存在被唤醒后抢不到锁的可能,因此会再次陷入休眠等待
                   nanos = emptyWaitSet.awaitNanos(nanos);
               
               catch (InterruptedException e)
               
                   e.printStackTrace();
               
           
           //返回任务
           T first = deque.removeFirst();
           //唤醒等待中的生产者线程
           fullWaitSet.signal();
           return first;
       
       finally
       
           //解锁
           lock.unlock();
       
    

    //阻塞获取任务---无限等待,直到被唤醒
    public T take()
    
        //加锁
        lock.lock();
        try
        
            //当前没有任务
           while(deque.isEmpty())
           
               try
               
                emptyWaitSet.await();
                catch (InterruptedException e) 
                   e.printStackTrace();
               
           
             //返回任务
            T first = deque.removeFirst();
            //唤醒等待中的生产者线程
           fullWaitSet.signal();
           return first;
        
        finally
        
            //解锁
            lock.unlock();
        
    

    //阻塞添加
    public void put(T task)
    
        lock.lock();
        try
        
            //队列满了
         while(deque.size()==capcity)
         
             try
             
                 log.debug("等待加入任务队列 ...",task);
                 fullWaitSet.await();
              catch (InterruptedException e) 
                 e.printStackTrace();
             
         
         //当前队列未满
         log.debug("加入队列 ",task);
         //加入队列尾部
         deque.addLast(task);
         //唤醒等待中的消费者线程
         emptyWaitSet.signal();
        
        finally
        
            lock.unlock();
        
    

   //带超时的阻塞添加
   public boolean offer(T task,long timeout,TimeUnit timeUnit)
   
       lock.lock();
       try
           long nanos = timeUnit.toNanos(timeout);
           while(deque.size()==capcity)
           
               if(nanos<=0)
                   return false;
               log.debug("等待加入的任务队列  ...",task);
               try 
                   nanos = fullWaitSet.awaitNanos(nanos);
                catch (InterruptedException e) 
                   e.printStackTrace();
               
           
           log.debug("加入任务队列 ",task);
           deque.addLast(task);
           emptyWaitSet.signal();
           return true;
        finally 
           lock.unlock();
       
   

   public int size()
   
       lock.lock();
       try
           return deque.size();
       
       finally 
           lock.unlock();
       
   

   public void tryPut(RejectPolicy<T> rejectPolicy,T task)
   
       lock.lock();
       try
       
         //判断队列是否满了
         if(deque.size()==capcity)
         
             //采用指定的拒绝策略
             rejectPolicy.reject(this,task);
         else
         
             //有空闲
             log.debug("加入任务队列 ",task);
             deque.addLast(task);
             emptyWaitSet.signal();
         
       finally 
           lock.unlock();
       
   


步骤3:自定义线程池

package Pool;



import lombok.extern.slf4j.Slf4j;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;

//线程池
@Slf4j
public class ThreadPool

    //任务队列
    private BlockingQueue<Runnable> taskQueue;

    //线程集合
    private HashSet<Worker> workers=new HashSet<>();

    //核心线程数
    private int coreSize;

    //获取任务的超时时间
    private long timeout;

    //超时时间单位
    private TimeUnit timeUnit;

     //拒绝策略
    private RejectPolicy<Runnable> rejectPolicy;

    //执行任务
    public void execute(Runnable task)
    
        //当任务数没有超过coreSize时,直接交给worker对象执行
        //如果任务数超过coreSize时,加入任务队列暂存
        if(workers.size()<coreSize)
        
           Worker worker=new Worker(task);
           log.debug("新增worker,",worker,task);
           workers.add(worker);
           worker.start();
        
        else
        
        //    taskQueue.put(task);
            // 1) 死等
            // 2) 带超时等待
            // 3) 让调用者放弃任务执行
            // 4) 让调用者抛出异常
            // 5) 让调用者自己执行任务
            taskQueue.tryPut(rejectPolicy, task);
        
    

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,
                      RejectPolicy<Runnable> rejectPolicy) 
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    

    public class Worker extends Thread
    
        private Runnable task;
        public Worker(Runnable task) 
            this.task = task;
        
        @Override
        public void run() 
            // 执行任务
            // 1) 当 task 不为空,执行任务
            // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
            // while(task != null || (task = taskQueue.take()) != null) 
            while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) 
                try 
                    log.debug("正在执行...", task);
                    task.run();
                 catch (Exception e) 
                    e.printStackTrace();
                 finally 
                    task = null;
                
            
            synchronized (workers) 
                log.debug("worker 被移除", this);
                workers.remove(this);
            
        
    


步骤4:测试

package Pool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class Test

    public static void main(String[] args) 
        ThreadPool threadPool = new ThreadPool(1,
                1000, TimeUnit.MILLISECONDS, 1, (queue, task)->
            // 1. 死等
// queue.put(task);
            // 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
            // 3) 让调用者放弃任务执行
// log.debug("放弃", task);
            // 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
            // 5) 让调用者自己执行任务
            task.run();
        );
        for (int i = 0; i < 4; i++) 
            int j = i;
            threadPool.execute(() -> 
                try 
                    Thread.sleep(1000L);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                log.debug("", j);
            );
        
    



ThreadPoolExecutor

  • 线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量


从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

最高位是符号位,1表示负数,0表示整数

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));

// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc)  return rs | wc; 

  • 构造方法
public ThreadPoolExecutor(int corePoolSize,
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
 BlockingQueue<Runnable> workQueue,
 ThreadFactory threadFactory,
 RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字
  • handler 拒绝策略

工作方式:


  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排
    队,直到有空闲的线程。
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线
    程来救急。
  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它
    著名框架也提供了实现
  • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
  • CallerRunsPolicy 让调用者运行任务
  • DiscardPolicy 放弃本次任务
  • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方
    便定位问题
  • Netty 的实现,是创建一个新线程来执行任务
  • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
  • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由
    keepAliveTime 和 unit 来控制


根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池


newFixedThreadPool----固定大小的线程池

    public static ExecutorService newFixedThreadPool(int nThreads) 
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    

可以看到newFixedThreadPool底层还是通过ThreadPoolExecutor的构造参数传递不同参数实现

注意这里ThreadPoolExecutor返回的是线程池对象

特点

  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

评价 适用于任务量已知,相对耗时的任务

package Pool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class Test

    public static void main(String[] args)
    
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.execute(()->
            log.debug("线程名称: ",Thread.currentThread().getName());
            log.debug(以上是关于JUC学习之共享模型之工具上之线程池浅学的主要内容,如果未能解决你的问题,请参考以下文章

JUC学习之共享模型上

线程池浅学

JUC学习之共享模型之内存

JUC学习之共享模型之内存

JUC - 共享模型之工具 - 第六篇

JUC学习之预热知识