JUC学习之共享模型工具之JUC并发工具包上

Posted 大忽悠爱忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC学习之共享模型工具之JUC并发工具包上相关的知识,希望对你有一定的参考价值。


AQS 原理

概述

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

特点:

  • state 属性来表示资源的状态(分独占模式共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁

getState - 获取 state 状态

setState - 设置 state 状态

compareAndSetState - cas 机制设置 state 状态

独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源

  • 提供了基于 FIFO 的等待队列,类似于 MonitorEntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 MonitorWaitSet

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

获取锁的姿势

// 如果获取锁失败
if (!tryAcquire(arg)) 
 // 入队, 可以选择阻塞当前线程 park unpark

释放锁的姿势

// 如果释放锁成功
if (tryRelease(arg)) 
 // 让阻塞线程恢复运行


实现不可重入锁

自定义同步器

package schedule;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

//自定义锁(不可重入锁)
final class MySync extends AbstractQueuedSynchronizer 

   /**
   @param acquires 可重入数用来计数的,因为是不可重入锁,因此如果这里acquires的值大于一就返回false,表示加锁失败
    */
    @Override
    protected boolean tryAcquire(int acquires) 
        if (acquires == 1) 
            //尝试加锁
            if (compareAndSetState(0, 1)) 
                //加上了锁,并设置owner为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            
        
        //加锁失败
        return false;
    

    /**
     @param acquires 可重入数用来计数的,因为是不可重入锁,因此如果这里acquires的值大于一就返回false,表示解锁失败
     */
    @Override
    protected boolean tryRelease(int acquires) 
        if (acquires == 1) 
            //当前为没有上锁的状态
            if (getState() == 0) 
                throw new IllegalMonitorStateException();
            
            //设置当前没有线程占用锁
            setExclusiveOwnerThread(null);
            //解锁--- private volatile int state
            //因为state是volatile,因此将setState(0);方法执行放在setExclusiveOwnerThread(null);方法执行之前可以确保
            //不会产生指令重排; 确保线程可见性;----exclusiveOwnerThread不是volatile
            setState(0);
            return true;
        
        return false;
    

    //条件变量
    protected Condition newCondition() 
        return new ConditionObject();
    

    //是否持有独占锁--1:持有; 0不持有
    @Override
    protected boolean isHeldExclusively() 
        return getState() == 1;
    

同步器类中的大部分方法由其父类提供


自定义锁

有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁

package schedule;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

class MyLock implements Lock 
 static MySync sync = new MySync();

 @Override
 // 尝试,不成功,进入等待队列
 public void lock() 
  sync.acquire(1);
 

 @Override
 // 尝试,不成功,进入等待队列,可打断
 public void lockInterruptibly() throws InterruptedException 
  sync.acquireInterruptibly(1);
 

 @Override
 // 尝试一次,不成功返回,不进入队列
 public boolean tryLock() 
  return sync.tryAcquire(1);
 

 @Override
 // 尝试,不成功,进入等待队列,有时限
 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
  return sync.tryAcquireNanos(1, unit.toNanos(time));
 

 @Override
 // 释放锁
 public void unlock() 
  sync.release(1);
 

 @Override
 // 生成条件变量
 public Condition newCondition() 
  return sync.newCondition();
 


release和tryRelease的区别:


测试一下

       MyLock lock = new MyLock();
        new Thread(() -> 
            lock.lock();
            try 
                log.debug("locking...");
                sleep(1);
             catch (InterruptedException e) 
                e.printStackTrace();
             finally 
                log.debug("unlocking...");
                lock.unlock();
            
        , "t1").start();

        new Thread(() -> 
            lock.lock();
            try 
                log.debug("locking...");
             finally 
                log.debug("unlocking...");
                lock.unlock();
            
        , "t2").start();

输出

22:29:28.727 c.TestAqs [t1] - locking... 
22:29:29.732 c.TestAqs [t1] - unlocking... 
22:29:29.732 c.TestAqs [t2] - locking... 
22:29:29.732 c.TestAqs [t2] - unlocking...

不可重入测试

如果改为下面代码,会发现自己也会被挡住(只会打印一次 locking)

lock.lock();
log.debug("locking...");
lock.lock();
log.debug("locking...");

心得

起源

早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不够优雅,于是在 JSR166(java 规范提案)中创建了 AQS,提供了这种通用的同步器机制。

目标

AQS 要实现的功能目标

  • 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
  • 获取锁超时机制
  • 通过打断取消机制
  • 独占机制及共享机制
  • 条件不满足时的等待机制

要实现的性能目标

Instead, the primary performance goal here is scalability: topredictably maintain efficiency even, or especially, when
synchronizers are contended.


设计

AQS 的基本思想其实很简单

获取锁的逻辑

while(state 状态不允许获取) 
 if(队列中还没有此线程) 
 入队并阻塞
 

当前线程出队

释放锁的逻辑

if(state 状态允许了) 
 恢复阻塞的线程(s) 

要点

  • 原子维护 state 状态
  • 阻塞及恢复线程
  • 维护队列

1) state 设计

  • state 使用 volatile 配合 cas 保证其修改时的原子性
  • state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想

2) 阻塞恢复设计

  • 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么suspend 将感知不到
  • 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没问题
  • park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
  • park 线程还可以通过 interrupt 打断

3) 队列设计

使用了 FIFO 先入先出队列,并不支持优先级队列

设计时借鉴了 CLH 队列,它是一种单向无锁队列


队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态

入队伪代码,只需要考虑 tail 赋值的原子性

do 
 // 原来的 tail
 Node prev = tail;
 // 用 cas 在原来 tail 的基础上改为 node
 while(tail.compareAndSet(prev, node))

出队伪代码

// prev 是上一个节点
while((Node prev=node.prev).state != 唤醒状态) 

// 设置头节点
head = node;

CLH 好处:

  • 无锁,使用自旋
  • 快速,无阻塞

AQS 在一些方面改进了 CLH

        private Node enq ( final Node node)
            for (; ; ) 
                Node t = tail;
                // 队列中还没有元素 tail 为 null
                if (t == null) 
                    // 将 head 从 null -> dummy
                    if (compareAndSetHead(new Node()))
                        tail = head;
                 else 
                    // 将 node 的 prev 设置为原来的 tail
                    node.prev = t;
                    // 将 tail 从原来的 tail 设置为 node
                    if (compareAndSetTail(t, node)) 
                        // 原来 tail 的 next 设置为 node
                        t.next = node;
                        return t;
                    
                
            
        

主要用到 AQS 的并发工具类


ReentrantLock 原理


非公平锁实现原理

加锁流程

先从构造器开始看,默认为非公平锁实现

public ReentrantLock() 
 sync = new NonfairSync();

NonfairSync 继承自 AQS

没有竞争,加锁成功的情况:





第一个竞争出现时

Thread-1 执行了

  1. CAS 尝试将 state 由 0 改为 1,结果失败
  2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
  3. 接下来进入 addWaiter 逻辑,构造 Node 队列
  • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
  • Node 的创建是懒惰的
  • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程


当前线程进入 acquireQueued 逻辑
4. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
5. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
6. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false


7. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时
state 仍为 1,失败

8.当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回
true

9.进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)


再次有多个线程经历上述过程竞争失败,变成这个样子


解锁流程

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

  • 当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程—唤醒后继节点
  • 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
  • 回到 Thread-1 的 acquireQueued 流程


如果加锁成功(没有竞争),会设置

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

如果不巧又被 Thread-4 占了先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

加锁流程源码分析:






加锁源码

    // Sync 继承自 AQS
    static final class NonfairSync extends Sync 
        private static final long serialVersionUID = 7316153563782823691L;

        // 加锁实现
        final void lock() 
            // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
            if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread());
            else
                // 如果尝试失败,进入 ㈠
                acquire(1);
        

        // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
        public final void acquire(int arg) 
            // ㈡ tryAcquire 
            if (!tryAcquire(arg) &&
                    // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
                selfInterrupt();
            
        

        // ㈡ 进入 ㈢
        protected final boolean tryAcquire(int acquires) 
            return nonfairTryAcquire(acquires);
        

        // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
        final boolean nonfairTryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            // 如果还没有获得锁
            if (c == 0) 
                // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
                if (compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) 
                // state++
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            // 获取失败, 回到调用处
            return false;
        

        // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
        private Node addWaiter(Node mode) 
            // 将当前线程关联到一个 Node 对象上, 模式为独占模式
            Node node = new Node(Thread.currentThread(), mode);
            // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
            Node pred = tail;
            if (pred != null) 
                node.prev = pred;
                if (compareAndSetTail(pred, node)) 
                    // 双向链表
                    pred.next = node;
                    return node;
                
            
            // 尝试将 Node 加入 AQS, 进入 ㈥
            enq(node);
            return node;
        

        // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
        private Node enq(final Node node) 
            for (; ; ) 
                Node t = tail;
                if (t == null) 
                    // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
                    if (compareAndSetHead(new Node())) 
                        tail = head;
                    
                 else 
                    // cas 尝试将 Node 对象加入 AQS 队列尾部
                    node.prev = t;
                    if (compareAndSetTail(t, node)) 
                        t.next JUC学习之共享模型之内存

JUC学习之共享模型之内存

JUC学习之共享模型上

Java语言特性学习之四JUC

JUC并发编程 共享模式之工具 JUC ReentrantLock -- ReentrantLock原理

JUC并发编程 共享模式之工具 JUC 读写锁 StampedLock -- 介绍 & 使用