深入理解AQS

Posted z啵唧啵唧

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解AQS相关的知识,希望对你有一定的参考价值。

文章目录

深入理解AQS

AQS

概念
  • 是一种阻塞式锁和相关的同步器工具的框架
特点
  • 用state属性来表示资源的状态(分为独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取和释放锁
    • getState获取state状态
    • setState设置state状态
    • compareAndSetState 利用cas机制设置state状态
    • 独占模式是只有一个线程能够访问资源,共享模式是可以允许多个线程访问资源
  • 提供了基于队列的等待队列,类似于Monitor的EntryList
  • 条件变量来实现等待,唤醒机制,支持多个条件变量,类似于Monitor的WaitSet

AOS自定义实现锁

package com.zb.juc.test;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @Description:
 * @Author:啵啵啵啵啵啵唧~~~
 * @Date:2022/4/26
 */
@Slf4j(topic = "c.TestAqs")
public class TestAqs 


/**
 * 自定义锁(不可重入锁)
 */
class MyLock implements Lock 
    /**
     * 独占锁
     */
    class MySync extends AbstractQueuedSynchronizer
        /**
         * 加锁
         * @param arg
         * @return
         */
        @Override
        protected boolean tryAcquire(int arg) 
            if (compareAndSetState(0,1))
                //加上了锁,需要设置owner为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            
            return false;
        

        /**
         * 解锁
         * @param arg
         * @return
         */
        @Override
        protected boolean tryRelease(int arg) 
            //将Qwn设置为空表示没有线程占用
            setExclusiveOwnerThread(null);
            //将状态改为0
            setState(0);
            return true;
        

        /**
         * 是否持有独占锁
         * @return
         */
        @Override
        protected boolean isHeldExclusively() 
            return getState()==1;
        

        public Condition newCondition() 
            return new ConditionObject();
        
    

    private MySync sync = new MySync();

    /**
     * 加锁,不成功进入等待队列等待
     */
    @Override
    public void lock() 
       sync.acquire(1);
    

    /**
     * 加锁,可打断
     */
    @Override
    public void lockInterruptibly() throws InterruptedException 
       sync.acquireInterruptibly(1);
    

    /**
     * 尝试加锁只加锁一次一次失败之后就返回false
     */
    @Override
    public boolean tryLock() 
        return sync.tryAcquire(1);
    

    /**
     * 带超时版本的tryLock
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    

    /**
     * 解锁
     */
    @Override
    public void unlock() 
       sync.release(1);
    

    /**
     * 条件变量
     * @return
     */
    @Override
    public Condition newCondition() 
        return sync.newCondition();
    


ReentrantLock原理

非公平锁实现原理
加锁解锁原理
  • 先看构造器,默认为非公平锁的实现
/**
 * Creates an instance of @code ReentrantLock.
 * This is equivalent to using @code ReentrantLock(false).
 */
public ReentrantLock() 
    //NonfairSync继承自AQS
    sync = new NonfairSync();

  • 非公平锁加锁源码
final void lock() 
    if (compareAndSetState(0, 1))
        //没有竞争的时候状态为1,进行加锁->将owner设置为当前线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);

  • 没有竞争的时候,就加锁成功了
竞争失败原理
  • 出现竞争的时候
final void lock() 
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //出现竞争的时候即当前的状态不为1->调用这个acquire()方法
                acquire(1);
        
  • acquire()方法
public final void acquire(int arg) 
       //acquire()方法会调用一个tryAcquire(arg)方法其实就是尝试获取锁,
        if (!tryAcquire(arg) &&
            //如果再调用这个tryAcquire(arg)方法的时候其他线程恰好释放了锁,那么tryAcquire(arg) 方法的返回值就是false就不会走这个if块,否者就会走这个if语句块,执行acquireQueued()方法这个方法的作用式添加一个节点进入阻塞
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    

  • 如果尝试获取锁失败会进入acquireQueued()方法在这个方法里面其实还会再尝试获取几次
/**
当前线程进入acquireQueued的逻辑
1、acquireQueued会在一个死循环中不断尝试获取锁,失败之后进入park阻塞
2、如果自己是紧邻着head那么自己排在第二位,此时尝试再次获取锁,如果这时恰好人家咱占有者释放锁成功,那么自己就有机会获得锁,如果人家没有释放锁释放锁,就还是获取失败
3、获取失败进入shouldParkAfterFailedAcquire逻辑将前驱node,即head的waitStatus改为-1,改为-1的意思是前驱节点需要唤醒这个后继节点,然后返回false再次尝试
4、shouldParkAfterFailedAcquire执行完毕之后回到这个acquireQueued,再次尝试tryAcquire,如果失败
5、这时候再次进入shouldParkAfterFailedAcquire方法时,因为前驱节点waitStatus已经时-1了,这次返回true
6、shouldParkAfterFailedAcquire返回true就会进入到这个parkAndCheckInterrupt方法,进行park
**/

//--------------------------------------------------------------------------------
//这就是等待对列 
final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                //获取前驱节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                //不再进行尝试
                cancelAcquire(node);
        
    

//------------------------------------------------------------------------------------
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 该节点已经设置了请求释放信号的状态,因此可以安全停止
             */
            return true;
        if (ws > 0) 
            /*
             * 前任被取消了。跳过前导并指示重试。
             */
            do 
                node.prev = pred = pred.prev;
             while (pred.waitStatus > 0);
            pred.next = node;
         else 
            /*
             * waitStatus必须为0或0。表明我们需要信号,但先别停车。打电话的人需要重试以确保在停车前无法获取。
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        
        return false;
    

//---------------------------------------------------------------------------------------
   private final boolean parkAndCheckInterrupt() 
        LockSupport.park(this);
        return Thread.interrupted();
    
RenntrantLock可重入的原理
  • nonfairTryAcquire()获取锁源码分析
 //acquires 为1
final boolean nonfairTryAcquire(int acquires) 
     //先获取当前的线程
            final Thread current = Thread.currentThread();
     //获取当前线程的状态
            int c = getState();
     //判断当前线程的状态是为0,为0表示没有线程占用
            if (c == 0) 
                //没有线程占用的话直接使用CAS进行交换,此时状态就改变为1
                if (compareAndSetState(0, acquires)) 
                    //设置当前锁的占有者为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                
            
    //如果状态不为0,表示当前锁还被占有着呢,此时线程比一定会被阻塞住,他会先判断锁的占有者是否是自己
            else if (current == getExclusiveOwnerThread()) 
                //如果此时锁的占有者就是自己,那么将state这个状态进行+1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        
可打断原理
可打断模式
  • 在此模式下,即使他被打断,仍然会被驻留在AQS对罗列中,等待获得锁之后才能继续运行
  • 线程在没有办法获得锁的时候会进入这个acquireQueued()方法进行循环尝试
  • acquireQueued方法分析
private final boolean parkAndCheckInterrupt() 
    //如果打断标记已经是true,则park会失效
        LockSupport.park(this);
    //interrupted 会清除打断标记
        return Thread.interrupted();
    


final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //尝试仍不成功进入这个park方法
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    
公平锁实现原理
非公平锁实现
        final boolean nonfairTryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            //判断是否有获得锁
            if (c == 0) 
                //没有线程获得这个把锁的时候,当前线程直接尝试CAS获得,不会去检查AQS队列,所以是非公平的
                if (compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            else if (current == getExclusiveOwnerThread()) 
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        
公平锁实现
  • 公平锁和非公平锁区别主要在于 tryAcquire方法的实现
  • 公平锁在线程进来之后会先判断AQS队列当中是否有这个前序节点,没有才会去竞争

读写锁

ReentrantReadWriteLock
  • 当读操作远远高于写操作时,这时候使用读写锁让读读可以并发,提高性能。
  • 类似于数据库当中的select … from … lock in share mode
  • 提供一个 数据容器类 内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法
/**
 * 对数据进行一些读写实验
 */
@Slf4j(topic = "c.DataContainer")
class DataContainer
    /**
     * 读写操作的数据
     */
    private Object data;
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    /**
     * 获取读锁对象
     */
    private ReentrantReadWriteLock.ReadLock r = rw.readLock();
    /**
     * 获取写锁对象
     */
    private ReentrantReadWriteLock.WriteLock w = rw.writeLock();

    /**
     * 读取数据操作
     * @return
     */
    public Object read()
        log.debug("获取读锁");
        r.lock();
        try 
            log.debug("读取");
            Thread.sleep(1000L);
         catch (InterruptedException e) 
            e.printStackTrace();
         finally 
            log.debug("释放读锁");
            r.unlock();
            return data;
        
    

    /**
     * 写数据操作
     * @param data
     */
    public void write(Object data)
        log.debug("获取写锁");
        w.lock();
        try 
            log.debug("写入");
            this.data = data;
        finally 
            log.debug("释放写锁");
            w.unlock();
        
    


  • 验证读读不互斥
//开两个线程同时调用对这个数据进行读操作
@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock 
    public static void main(String[] args) 
        DataContainer dc = new DataContainer();
        new Thread(()->
            dc.read();
        ,"t1").start();
        new Thread(()->
            dc.read();
        ,"t2").start();
    

读读并不会互斥

  • 验证读写操作的互斥
@Slf4j(topic = "c.TestReadWriteLock")
public class TestReadWriteLock 
    public static void main(String[] args) throws InterruptedException 
        DataContainer dc = new DataContainer();
        new Thread(()->
            dc.read();
        ,"t1").start();
        Thread.sleep(100L);
        new Thread(()->
            dc.write(1);
        ,"t2").start();
    

读写互斥

注意事项
  • 读写锁不支持条件变量
  • 重入时升级不支持,即持有读锁的情况下去获取写锁,会导致读写锁永久的等待
  • 重入时降级时支持的,意思就是持有写锁的情况下去获取读锁是支持的
class CachedData 
    Object data;
    // 是否有效,如果失效,需要重新计算 data
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() 
        rwl.readLock().lock();
        if (!cacheValid) 
// 获取写锁前必须释放读锁
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try 
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
                if (!cacheValid) 以上是关于深入理解AQS的主要内容,如果未能解决你的问题,请参考以下文章

进阶笔录-深入理解Java线程之-AQS

深入理解AQS(二)- 共享模式

聊聊高并发(二十四)解析java.util.concurrent各个组件 深入理解AQS

深入理解AbstractQueuedSynchronizer

深入浅出ReentrantReadWriteLock源码解析

深入java并发包源码AQS的介绍与使用