JUC - 共享模型之工具 - 第六篇

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC - 共享模型之工具 - 第六篇相关的知识,希望对你有一定的参考价值。

六、共享模型之工具

1. 线程池

1.1 自定义线程池

步骤1:自定义拒绝策略接口

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> 
    void reject(BlockingQueue<T> queue, T task);

步骤2:自定义任务队列

class BlockingQueue<T> 
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();
    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();
    // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    // 5. 容量
    private int capcity;
    public BlockingQueue(int capcity) 
        this.capcity = capcity;
    
    // 带超时阻塞获取
    public T poll(long timeout, TimeUnit unit) 
        lock.lock();
        try 
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) 
                try 
                    // 返回值是剩余时间
                    if (nanos <= 0) 
                        return null;
                    
                    nanos = emptyWaitSet.awaitNanos(nanos);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
         finally 
            lock.unlock();
        
    
    // 阻塞获取
    public T take() 
        lock.lock();
        try 
            while (queue.isEmpty()) 
                try 
                    emptyWaitSet.await();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
         finally 
            lock.unlock();
        
    
    // 阻塞添加
    public void put(T task) 
        lock.lock();
        try 
            while (queue.size() == capcity) 
                try 
                    log.debug("等待加入任务队列  ...", task);
                    fullWaitSet.await();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
            log.debug("加入任务队列 ", task);
            queue.addLast(task);
            emptyWaitSet.signal();
         finally 
            lock.unlock();
        
    
    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) 
        lock.lock();
        try 
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capcity) 
                try 
                    if(nanos <= 0) 
                        return false;
                    
                    log.debug("等待加入任务队列  ...", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
            log.debug("加入任务队列 ", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
         finally 
            lock.unlock();
        
    
    public int size() 
        lock.lock();
        try 
            return queue.size();
         finally 
            lock.unlock();
        
    
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) 
        lock.lock();
        try 
            // 判断队列是否满
            if(queue.size() == capcity) 
                rejectPolicy.reject(this, task);
             else  // 有空闲
                log.debug("加入任务队列 ", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            
         finally 
            lock.unlock();
        
    

步骤3:自定义线程池

class ThreadPool 
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;
    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();
    // 核心线程数
    private int coreSize;
    // 获取任务时的超时时间
    private long timeout;
    private TimeUnit timeUnit;
    private RejectPolicy<Runnable> rejectPolicy;
    // 执行任务
    public void execute(Runnable task) 
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) 
            if(workers.size() < coreSize) 
                Worker worker = new Worker(task);
                log.debug("新增 worker, ", worker, task);
                workers.add(worker);
                worker.start();
             else 
                // taskQueue.put(task);
                // 1) 死等
                // 2) 带超时等待
                // 3) 让调用者放弃任务执行
                // 4) 让调用者抛出异常
                // 5) 让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            
        
    
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,
                      RejectPolicy<Runnable> rejectPolicy) 
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    
    class Worker extends Thread
        private Runnable task;
        public Worker(Runnable task) 
            this.task = task;
        
        @Override
        public void run() 
            // 执行任务
            // 1) 当 task 不为空,执行任务
            // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
            // while(task != null || (task = taskQueue.take()) != null) 
            while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) 
                try 
                    log.debug("正在执行...", task);
                    // task的run 方法跟 work 的run 是不同的,task的run 在外部 重写了
                    task.run(); 
                 catch (Exception e) 
                    e.printStackTrace();
                 finally 
                    task = null;
                
            
            synchronized (workers) 
                log.debug("worker 被移除", this);
                workers.remove(this);
            
        
    

步骤4:测试

public static void main(String[] args) 
    ThreadPool threadPool = new ThreadPool(1,
                                           1000, TimeUnit.MILLISECONDS, 1, (queue, task)->
                                               // 1. 死等
                                               // queue.put(task);
                                               // 2) 带超时等待
                                               // queue.offer(task, 1500, TimeUnit.MILLISECONDS);
                                               // 3) 让调用者放弃任务执行
                                               // log.debug("放弃", task);
                                               // 4) 让调用者抛出异常
                                               // throw new RuntimeException("任务执行失败 " + task);
                                               // 5) 让调用者自己执行任务
                                               task.run();
                                           );
    for (int i = 0; i < 4; i++) 
        int j = i;
        threadPool.execute(() -> 
            try 
                Thread.sleep(1000L);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            log.debug("", j);
        );
    

1.2 ThreadPoolExecutor

(1) 线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态名 高 3 位 接收新任 务 处理阻塞队列任 务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余 任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列 任务
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入 终结
TERMINATED 011 - - 终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc)  return rs | wc; 

(2) 构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字
  • handler 拒绝策略

工作方式:

graph LR

subgraph 阻塞队列
size=2
t3(任务3)
t4(任务4)
end

subgraph 线程池c=2,m=3
ct1(核心线程1)
ct2(核心线程2)
mt1(救急线程)
ct1 --> t1(任务1)
ct2 --> t2(任务2)
end



style ct1 fill:#ccf,stroke:#f66,stroke-width:2px
style ct2 fill:#ccf,stroke:#f66,stroke-width:2px
style mt1 fill:#ccf,stroke:#f66,stroke-width:2px,stroke-dasharray: 5.5
  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
    • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
    • CallerRunsPolicy 让调用者运行任务
    • DiscardPolicy 放弃本次任务
    • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
    • Netty 的实现,是创建一个新线程来执行任务
    • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
    • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池

(3) newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) 
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

特点

  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

(4) newCachedThreadPool

public static ExecutorService newCachedThreadPool() 
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

特点

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
    • 全部都是救急线程(60s 后可以回收)
      • 救急线程可以无限创建
    • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交 货)
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> 
    try 
        log.debug("putting  ", 1);
        integers.put(1);
        log.debug(" putted...", 1);
        log.debug("putting... ", 2);
        integers.put(2);
        log.debug(" putted...", 2);
     catch (InterruptedException e) 
        e.printStackTrace();
    
,"t1").start();
sleep(1);
new Thread(() -> 
    try 
        log.debug("taking ", 1);
        integers.take();
     catch (InterruptedException e) 
        e.printStackTrace();
    
,"t2").start();
sleep(1);
new Thread(() -> 
    try 
        log.debug("taking ", 2);
        integers.take();
     catch (InterruptedException e) 
        e.printStackTrace();
    
,"t3").start();

输出

11:48:15.500 c.TestSynchronousQueue [t1] - putting 1
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted...
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...

(5) newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() 
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));

使用场景:

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
    • FinalizableDelegatedExecutorService 应用的是装饰器模式(FinalizableDelegatedExecutorService 是具体装饰角色,DelegatedExecutorService 是抽象装饰角色,ExecutorService 是抽象构件角色),只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
    • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

(6) 提交任务

// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
    throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

execute和submit的区别

  • execute只能提交Runnable类型的任务,无返回值。submit既可以提交Runnable类型的任务,也可以提交Callable类型的任务,会有一个类型为Future的返回值,但当任务类型为Runnable时,返回值为null。
  • execute在执行任务时,如果遇到异常会直接抛出,而submit不会直接抛出,只有在使用Future的get方法获取返回值时,才会抛出异常。
  • submit 执行后会立即返回一个结果,其他线程可以将返回的对象调用其方法get() 获取将来的结果,会阻塞等待(任务未执行完成)。

(7) 关闭线程池

shutdown

/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
public void shutdown() 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try 
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(SHUTDOWN);
        // 仅会打断空闲线程
        interruptIdleWorkers();
        onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
     finally 
        mainLock.unlock();
    
    // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
    tryTerminate();
 

shutdownNow

/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() 
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try 
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(STOP);
        // 打断所有线程
        interruptWorkers();
        // 获取队列中剩余任务
        tasks = drainQueue();
     finally 
        mainLock.unlock();
    
    // 尝试终结
    tryTerminate();
    return tasks;

其它方法

// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
情,可以利用此方法等待
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

* 异步模式之工作线程

定义

让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现 就是线程池,也体现了经典设计模式中的享元模式。

例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那 么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)

注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率

例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成 服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工

饥饿

固定大小线程池会有饥饿现象

  • 两个工人是同一个线程池中的两个线程
  • 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
    • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
    • 后厨做菜:没啥说的,做就是了
  • 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
  • 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿
public class TestDeadLock 
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();
    static String cooking() 
        return MENU.get(RANDOM.nextInt(MENU.size()));
    
    public static void main(String[] args) 
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> 
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> 
                log.debug("做菜");
                return cooking();
            );
            try 
                log.debug("上菜: ", f.get());
             catch (InterruptedException | ExecutionException e) 
                e.printStackTrace();
            
        );
        /*
        executorService.execute(() -> 
            log.debug("处理点餐...");
            Future<String> f = executorService.submit(() -> 
                log.debug("做菜");
                return cooking();
            );
            try 
                log.debug("上菜: ", f.get());
             catch (InterruptedException | ExecutionException e) 
                e.printStackTrace();
            
        );
        */
    

输出

17:21:27.883 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:21:27.891 c.TestDeadLock [pool-1-thread-2] - 做菜
17:21:27.891 c.TestDeadLock [pool-1-thread-1] - 上菜: 烤鸡翅

当注释取消后,可能的输出

17:08:41.339 c.TestDeadLock [pool-1-thread-2] - 处理点餐...
17:08:41.339 c.TestDeadLock [pool-1-thread-1] - 处理点餐...

解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程池,例如:

public class TestDeadLock 
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();
    static String cooking() 
        return MENU.get(RANDOM.nextInt(MENU.size()));
    
    public static void main(String[] args) 
        ExecutorService waiterPool = Executors.newFixedThreadPool(1);
        ExecutorService cookPool = Executors.newFixedThreadPool(1);
        waiterPool.execute(() -> 
            log.debug("处理点餐...");
            Future<String> f = cookPool.submit(() -> 
                log.debug("做菜");
                return cooking();
            );
            try 
                log.debug("上菜: ", f.get());
             catch (InterruptedException | ExecutionException e) 
                e.printStackTrace();
            
        );
        waiterPool.execute(() -> 
            log.debug("处理点餐...");
            Future<String> f = cookPool.submit(() -> 
                log.debug("做菜");
                return cooking();
            );
            try 
                log.debug("上菜: ", f.get());
             catch (InterruptedException | ExecutionException e) 
                e.printStackTrace();
            
        );
    

输出

17:25:14.626 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.630 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.631 c.TestDeadLock [pool-1-thread-1] - 上菜: 地三鲜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.632 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 上菜: 辣子鸡丁
创建多少线程池合适
  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿
  • 过大会导致更多的线程上下文切换,占用更多内存
CPU 密集型运算

通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下

线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

  • 例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式 4 * 100% * 100% / 50% = 8
  • 例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式 4 * 100% * 100% / 10% = 40
自定义线程池

(8) 任务调度线程池

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

public static void main(String[] args) 
    Timer timer = new Timer();
    TimerTask task1 = new TimerTask() 
        @Override
        public void run() 
            log.debug("task 1");
            sleep(2);
        
    ;
    TimerTask task2 = new TimerTask() 
        @Override
        public void run() 
            log.debug("task 2");
        
    ;
    // 使用 timer 添加两个任务,希望它们都在 1s 后执行
    // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
    timer.schedule(task1, 1000);
    timer.schedule(task2, 1000);

输出

0:46:09.444 c.TestTimer [main] - start...
20:46:10.447 c.TestTimer [Timer-0] - task 1
20:46:12.448 c.TestTimer [Timer-0] - task 2

使用 ScheduledExecutorService 改写:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> 
    System.out.println("任务1,执行时间:" + new Date());
    try  Thread.sleep(2000);  catch (InterruptedException e)  
, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> 
    System.out.println("任务2,执行时间:" + new Date());
, 1000, TimeUnit.MILLISECONDS);

输出

任务1,执行时间:Thu Jan 03 12:45:17 CST 2019
任务2,执行时间:Thu Jan 03 12:45:17 CST 2019

scheduleAtFixedRate 例子:

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> 
    log.debug("running...");
, 1, 1, TimeUnit.SECONDS);

输出

21:45:43.167 c.TestTimer [main] - start...
21:45:44.215 c.TestTimer [pool-1-thread-1] - running...
21:45:45.215 c.TestTimer [pool-1-thread-1] - running...
21:45:46.215 c.TestTimer [pool-1-thread-1] - running...
21:45:47.215 c.TestTimer [pool-1-thread-1] - running...

scheduleAtFixedRate 例子(任务执行时间超过了间隔时间):

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> 
    log.debug("running...");
    sleep(2);
, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s

21:44:30.311 c.TestTimer [main] - start...
21:44:31.360 c.TestTimer [pool-1-thread-1] - running...
21:44:33.361 c.TestTimer [pool-1-thread-1] - running...
21:44:35.362 c.TestTimer [pool-1-thread-1] - running...
21:44:37.362 c.TestTimer [pool-1-thread-1] - running...

scheduleWithFixedDelay 例子:

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(()-> 
    log.debug("running...");
    sleep(2);
, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1sscheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所以间隔都是 3s

21:40:55.078 c.TestTimer [main] - start...
21:40:56.140 c.TestTimer [pool-1-thread-1] - running...
21:40:59.143 c.TestTimer [pool-1-thread-1] - running...
21:41:02.145 c.TestTimer [pool-1-thread-1] - running...
21:41:05.147 c.TestTimer [pool-1-thread-1] - running...

(9) 正确处理执行任务异常

方法1:主动捉异常

ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> 
    try 
        log.debug("task1");
        int i = 1 / 0;
     catch (Exception e) 
        log.error("error:", e);
    
);

输出

21:59:04.558 c.TestTimer [pool-1-thread-1] - task1
21:59:04.562 c.TestTimer [pool-1-thread-1] - error:
java.lang.ArithmeticException: / by zero
        at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

方法2:使用 Future

ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> 
    log.debug("task1");
    int i = 1 / 0;
    return true;
);
log.debug("result:", f.get());

输出

21:54:58.208 c.TestTimer [pool-1-thread-1] - task1
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at cn.itcast.n8.TestTimer.main(TestTimer.java:31)
Caused by: java.lang.ArithmeticException: / by zero
        at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

* 应用之定时任务

定期执行

如何让每周四 18:00:00 定时执行任务?

// 获得当前时间
LocalDateTime now = LocalDateTime.now();
// 获取本周四 18:00:00.000
LocalDateTime thursday =
    now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
// 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
if(now.compareTo(thursday) >= 0) 
    thursday = thursday.plusWeeks(1);

// 计算时间差,即延时执行时间
long initialDelay = Duration.between(now, thursday).toMillis();
// 计算间隔时间,即 1 周的毫秒值
long oneWeek = 7 * 24 * 3600 * 1000;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println("开始时间:" + new Date());
executor.scheduleAtFixedRate(() -> 
    System.out.println("执行时间:" + new Date());
, initialDelay, oneWeek, TimeUnit.MILLISECONDS);

(10) tomcat 线程池

Tomcat 在哪里用到了线程池呢

  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
  • 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责【处理请求】

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同

  • 如果总线程数达到 maximumPoolSize
    • 这时不会立刻抛 RejectedExecutionException 异常
    • 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

源码 tomcat-7.0.42

public void execute(Runnable command, long timeout, TimeUnit unit) 
    submittedCount.incrementAndGet();
    try 
        super.execute(command);
     catch (RejectedExecutionException rx) 
        if (super.getQueue() instanceof TaskQueue) 
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try 
                if (!queue.force(command, timeout, unit)) 
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.");
                
             catch (InterruptedException x) 
                submittedCount.decrementAndGet();
                Thread.interrupted();
                throw new RejectedExecutionException(x);
            
         else 
            submittedCount.decrementAndGet();
            throw rx;
        
    

TaskQueue.java

public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException 
    if ( parent.isShutdown() )
        throw new RejectedExecutionException(
        "Executor not running, cant force a command into the queue"
    );
    return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected

Connector 配置

配置项 默认值 说明
acceptorThreadCount 1 acceptor 线程数量
pollerThreadCount 1 poller 线程数量
minSpareThreads 10 核心线程数,即 corePoolSize
maxThreads 200 最大线程数,即 maximumPoolSize
executor - Executor 名称,用来引用下面的 Executor

Executor 线程配置

配置项 默认值 说明
threadPriority 5 线程优先级
daemon true 是否守护线程
minSpareThreads 25 核心线程数,即 corePoolSize
maxThreads 200 最大线程数,即 maximumPoolSize
maxIdleTime 60000 线程生存时间,单位是毫秒,默认值即 1 分钟
maxQueueSize Integer.MAX_VALUE 队列长度
prestartminSpareThreads false 核心线程是否在服务器启动时启动
graph LR
a(添加新任务) --> b(提交任务 < 核心线程)
b -->|是| c(加入队列)
b --> |否| d(提交任务 < 最大线程)
d --> |是| e(创建急救线程)
d --> |否| c

1.3 Fork/Join

(1) 概念

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算

所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率

Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

(2) 使用

提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务

@Slf4j(topic = "c.AddTask")
class AddTask1 extends RecursiveTask<Integer> 
    int n;
    public AddTask1(int n) 
        this.n = n;
    
    @Override
    public String toString() 
        return "" + n + ;
    
    @Override
    protected Integer compute() 
        // 如果 n 已经为 1,可以求得结果了
        if (n == 1) 
            log.debug("join() ", n);
            return n;
        
        // 将任务进行拆分(fork)
        AddTask1 t1 = new AddTask1(n - 1);
        t1.fork();
        log.debug("fork()  + ", n, t1);
        // 合并(join)结果
        int result = n + t1.join();
        log.debug("join()  +  = ", n, t1, result);
        return result;
    

然后提交给 ForkJoinPool 来执行

public static void main(String[] args) 
    ForkJoinPool pool = new ForkJoinPool(4);
    System.out.println(pool.invoke(new AddTask1(5)));

结果

[ForkJoinPool-1-worker-0] - fork() 2 + 1
[ForkJoinPool-1-worker-1] - fork() 5 + 4
[ForkJoinPool-1-worker-0] - join() 1
[ForkJoinPool-1-worker-0] - join() 2 + 1 = 3
[ForkJoinPool-1-worker-2] - fork() 4 + 3
[ForkJoinPool-1-worker-3] - fork() 3 + 2
[ForkJoinPool-1-worker-3] - join() 3 + 2 = 6
[ForkJoinPool-1-worker-2] - join() 4 + 3 = 10
[ForkJoinPool-1-worker-1] - join() 5 + 4 = 15
15

用图来表示

graph LR
t1("t1 5 + 4")
t2("t2 4 + 3")
t3("t3 3 + 2")
t00("t0 2 + 1")
t0("t0")

t00 -- "1" --> t0
t0 -. "1" .-> t00
t3 -. "2" .-> t00
t00 -. "3" .-> t3
t2 -- "3" --> t3
t3 -. "6" .-> t2
t1 -- "4" --> t2
t2 -. "10" .-> t1

改进

class AddTask3 extends RecursiveTask<Integer> 
    int begin;
    int end;
    public AddTask3(int begin, int end) 
        this.begin = begin;
        this.end = end;
    
    @Override
    public String toString() 
        return "" + begin + "," + end + ;
    
    @Override
    protected Integer compute() 
        // 5, 5
        if (begin == end) 
            log.debug("join() ", begin);
            return begin;
        
        // 4, 5
        if (end - begin == 1) 
            log.debug("join()  +  = ", begin, end, end + begin);
            return end + begin;
        
        // 1 5
        int mid = (end + begin) / 2; // 3
        AddTask3 t1 = new AddTask3(begin, mid); // 1,3
        t1.fork();
        AddTask3 t2 = new AddTask3(mid + 1, end); // 4,5
        t2.fork();
        log.debug("fork()  +  = ?", t1, t2);
        int result = t1.join() + t2.join();
        log.debug("join()  +  = ", t1, t2, result);
        return result;
    

然后提交给 ForkJoinPool 来执行

public static void main(String[] args) 
    ForkJoinPool pool = new ForkJoinPool(4);
    System.out.println(pool.invoke(new AddTask3(1, 10)));

结果

[ForkJoinPool-1-worker-0] - join() 1 + 2 = 3
[ForkJoinPool-1-worker-3] - join() 4 + 5 = 9
[ForkJoinPool-1-worker-0] - join() 3
[ForkJoinPool-1-worker-1] - fork() 1,3 + 4,5 = ?
[ForkJoinPool-1-worker-2] - fork() 1,2 + 3,3 = ?
[ForkJoinPool-1-worker-2] - join() 1,2 + 3,3 = 6
[ForkJoinPool-1-worker-1] - join() 1,3 + 4,5 = 15
15

用图来表示

graph LR
t1("t1 1,3 + 4,5")
t2("t2 1,2,3,3")
t3("t3")
t0("t0")

t1 -- "1,3" --> t2
t1 -- "4,5" --> t3
t2 -- "1,2" --> t0
t2 -- "3,3" --> t0
t0 -. "3".->  t2
t0 -. "3" .-> t2
t2 -. "6" .- t1
t3 -. "9" .-> t1
t1 -. 15 .-> 结果

2. J.U.C

2.1 AQS 原理

概述

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

特点:

  • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState - 获取 state 状态
    • setState - 设置 state 状态
    • compareAndSetState - cas 机制设置 state 状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

获取锁的姿势

// 如果获取锁失败
if (!tryAcquire(arg)) 
    // 入队, 可以选择阻塞当前线程 park unpark

释放锁的姿势

// 如果释放锁成功
if (tryRelease(arg)) 
    // 让阻塞线程恢复运行

实现不可重入锁

自定义同步器
final class MySync extends AbstractQueuedSynchronizer 
    @Override
    protected boolean tryAcquire(int acquires) 
        if (acquires == 1)
            if (compareAndSetState(0, 1)) 
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            
        
        return false;
    
    @Override
    protected boolean tryRelease(int acquires) 
        if(acquires == 1) 
            if(getState() == 0) 
                throw new IllegalMonitorStateException();
            
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        
        return false;
    
    protected Condition newCondition() 
        return new ConditionObject();
    
    @Override
    protected boolean isHeldExclusively() 
        return getState() == 1;
    

自定义锁

有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁

class MyLock implements Lock 
    static MySync sync = new MySync();
    @Override
    // 尝试,不成功,进入等待队列
    public void lock() 
        sync.acquire(1);
    
    @Override
    // 尝试,不成功,进入等待队列,可打断
    public void lockInterruptibly() throws InterruptedException 
        sync.acquireInterruptibly(1);
    
    @Override
    // 尝试一次,不成功返回,不进入队列
    public boolean tryLock() 
        return sync.tryAcquire(1);
    
    @Override
    // 尝试,不成功,进入等待队列,有时限
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    
    @Override
    // 释放锁
    public void unlock() 
        sync.release(1);
    
    @Override
    // 生成条件变量
    public Condition newCondition() 
        return sync.newCondition();
    

测试一下

MyLock lock = new MyLock();
new Thread(() -> 
    lock.lock();
    try 
        log.debug("locking...");
        sleep(1);
     finally 
        log.debug("unlocking...");
        lock.unlock();
    
,"t1").start();
new Thread(() -> 
    lock.lock();
    try 
        log.debug("locking...");
     finally 
        log.debug("unlocking...");
        lock.unlock();
    
,"t2").start();

输出

22:29:28.727 c.TestAqs [t1] - locking...
22:29:29.732 c.TestAqs [t1] - unlocking...
22:29:29.732 c.TestAqs [t2] - locking...
22:29:29.732 c.TestAqs [t2] - unlocking...

不可重入测试

如果改为下面代码,会发现自己也会被挡住(只会打印一次 locking)

lock.lock();
log.debug("locking...");
lock.lock();
log.debug("locking...")

心得

起源

早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不够优雅,于是在 JSR166(java 规范提案)中创建了 AQS,提供了这种通用的同步器机制。

目标

AQS 要实现的功能目标

  • 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
  • 获取锁超时机制
  • 通过打断取消机制
  • 独占机制及共享机制
  • 条件不满足时的等待机制

要实现的性能目标

设计

AQS 的基本思想其实很简单

获取锁的逻辑

while(state 状态不允许获取) 
    if(队列中还没有此进程) 
        入队并阻塞
    

当前线程出队

释放锁的逻辑

if(state 状态允许了) 
    恢复阻塞的线程(s)

要点

  • 原子维护 state 状态
  • 阻塞及恢复线程
  • 维护队列

1) state设计

  • tate 使用 volatile 配合 cas 保证其修改时的原子性
  • state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想

2) 阻塞恢复设计

  • 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume那么 suspend 将感知不到
  • 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark再park 也没问题
  • park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
  • park 线程还可以通过 interrupt 打断

3) 队列设计

  • 用了 FIFO 先入先出队列,并不支持优先级队列
  • 设计时借鉴了 CLH 队列,它是一种单向无锁队列
graph LR
subgraph 初始
head1(head) --> Dumy1(Dumy)
tail1(tail) --> Dumy1
end

subgraph 新节点
head2(head) --> Dumy2(Dumy)
tail2(tail) --> Node
Node -- "prev" --> Dumy2

end
style Dumy1 fill:#f9f,stroke:#333,stroke-width:2px
style Dumy2 fill:#f9f,stroke:#333,stroke-width:2px

队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态入队伪代码,只需要考虑 tail 赋值的原子性

do 
    // 原来的 tail
    Node prev = tail;
    // 用 cas 在原来 tail 的基础上改为 node
 while(tail.compareAndSet(prev,node))

出队伪代码

// prev 是上一个节点
while (Node prev=node.prev).state != 唤醒状态) 
    

// 设置头结点
head = node;

CLH好处:

  • 无锁,使用自旋
  • 快速,无阻塞

AQS 在一些方面改进了CLH

private Node enq(final Node node) 
    for (;;) 
        Node t = tail;
        // 队列中还没有元素 tail 为 null
        if (t == null) 
            // 将 head 从 null -> dummy
            if (compareAndSetHead(new Node()))
                tail = head;
         else 
            // 将 node 的 prev 设置为原来的 tail
            node.prev = t;
            // 将 tail 从原来的 tail 设置为 node
            if (compareAndSetTail(t, node)) 
                // 原来 tail 的 next 设置为 node
                t.next = node;
                return t;
            
        
    

主要用到ASQ 的并发工具类

2.2 RenntrantLock 原理

非公平锁实现原理

加锁解锁流程

先从构造器开始看,默认为非公平锁实现

public ReentrantLock() 
    sync = new NonFairSync();F

NonfairSync继承自ASQ

没有竞争时

第出现时一个竞争

Thread-1 执行了

(1) CAS 尝试将 state 由 0 改为 1,结果失败

(2)进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败

(3) 接下来进入 addWaiter 逻辑,构造 Node 队列

  • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
  • Node 的创建是懒惰的
  • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

当前线程进入 acquireQueued 逻辑

(1) acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞 (2) 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为1,失败 (3) 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

(4) shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败 (5) 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true (6) 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

再次有多个线程经历上述过程竞争失败,变成这个样子

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程

找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1回到 Thread-1 的 acquireQueued 流程

如果加锁成功(没有竞争),会设置

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

如果不巧又被 Thread-4 占了先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码
// Sync 继承自 AQS
static final class NonfairSync extends Sync 
    private static final long serialVersionUID = 7316153563782823691L;
    // 加锁实现
    final void lock() 
        // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 如果尝试失败,进入 ㈠
            acquire(1);
    
    // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
    public final void acquire(int arg) 
        // ㈡ tryAcquire
        if (
            !tryAcquire(arg) &&
            // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) 
            selfInterrupt();
        
    
    // ㈡ 进入 ㈢
    protected final boolean tryAcquire(int acquires) 
        return nonfairTryAcquire(acquires);
    
    // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
    final boolean nonfairTryAcquire(int acquires) 
        final Thread current = Thread.currentThread();
        int c = getState();
        // 如果还没有获得锁
        if (c == 0) 
            // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
            if (compareAndSetState(0, acquires)) 
                setExclusiveOwnerThread(current);
                return true;
            
        
        // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
        else if (current == getExclusiveOwnerThread()) 
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        
        // 获取失败, 回到调用处
        return false;
    
    // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
    private Node addWaiter(Node mode) 
        // 将当前线程关联到一个 Node 对象上, 模式为独占模式
        Node node = new Node(Thread.currentThread(), mode);
        // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
        Node pred = tail;
        if (pred != null) 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) 
                // 双向链表
                pred.next = node;
                return node;
            
        
        // 尝试将 Node 加入 AQS, 进入 ㈥
        enq(node);
        return node;
    
    // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
    private Node enq(final Node node) 
        for (;;) 
            Node t = tail;
            if (t == null) 
                // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
                if (compareAndSetHead(new Node())) 
                    tail = head;
                
             else 
                // cas 尝试将 Node 对象加入 AQS 队列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
    
    // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
    final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
                if (p == head && tryAcquire(arg)) 
                    // 获取成功, 设置自己(当前线程对应的 node)为 head
                    setHead(node);
                    // 上一个节点 help GC
                    p.next = null;
                    failed = false;
                    // 返回中断标记 false
                    return interrupted;
                
                if (
                    // 判断是否应当 park, 进入 ㈦
                    shouldParkAfterFailedAcquire(p, node) &&
                    // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
                    parkAndCheckInterrupt()
                ) 
                    interrupted = true;
                
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    
    // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
        // 获取上一个节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) 
            // 上一个节点都在阻塞, 那么自己也阻塞好了
            return true;
        
        // > 0 表示取消状态
        if (ws > 0) 
            // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
            do 
                node.prev = pred = pred.prev;
             while (pred.waitStatus > 0);
            pred.next = node;
         else 
            // 这次还没有阻塞
            // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        
        return false;
    
    // ㈧ 阻塞当前线程
    private final boolean parkAndCheckInterrupt() 
        LockSupport.park(this);
        return Thread.interrupted();
    

解锁源码
// Sync 继承自 AQS
static final class NonfairSync extends Sync 
    // 解锁实现
    public void unlock() 
        sync.release(1);
    
    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final boolean release(int arg) 
        // 尝试释放锁, 进入 ㈠
        if (tryRelease(arg)) 
            // 队列头节点 unpark
            Node h = head;
            if (
                // 队列不为 null
                h != null &&
                // waitStatus == Node.SIGNAL 才需要 unpark
                h.waitStatus != 0
            ) 
                // unpark AQS 中等待的线程, 进入 ㈡
                unparkSuccessor(h);
            
            return true;
        
        return false;
    
    // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
    protected final boolean tryRelease(int relea

以上是关于JUC - 共享模型之工具 - 第六篇的主要内容,如果未能解决你的问题,请参考以下文章

JUC学习之共享模型工具之JUC并发工具包上

JUC学习之共享模型之工具上之线程池浅学

第六篇 VGGNet——模型精讲

第六篇:协调和协定之组通信

构建之法—第六篇

JUC学习之共享模型之内存