并发编程:AQS

Posted 三杯然诺

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程:AQS相关的知识,希望对你有一定的参考价值。

  AQS全称为AbstractQueuedSynchronizer,是并发容器中的同步器,AQS是J.U.C的核心,它是抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类都依赖它,如ReentrantLock、Semaphore、CyclicBarrier、ReentrantLock、Condition、FutureTask等。

 

  AQS的特点:

  a、使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架

  b、利用一个int类型表示状态

  c、使用方法是继承

  d、子类通过继承并通过实现它的方法管理其状态

  e、可以同时实现排它锁和共享锁模式(独占、共享)

 

  AQS实现原理:

  AQS维护了一个volatile int state和一个FIFO线程等待队列

  state的访问方式有三种:getState(),setState(),compareAndSetState()

  AQS定义两种资源共享方式,Exclusive(独占,只有一个线程能执行),Share(共享,多个线程可同时执行)

  自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,具体线程等待队列的维护,如获取资源失败入队,唤醒出队等,AQS在顶层实现好了。自定义同步器实现时主要实现以下几种方法;

  tryAcquire(int):独占方式,尝试获取资源,成功返回true,失败返回false

  tryRelease(int): 独占方法,尝试释放资源,成功则返回true,失败则返回false

  tryAcquireShared(int):共享方式,尝试获取资源,负数表示失败,0表示成功,但没有剩余可用资源,正数表示成功,且有剩余资源

  tryReleaseShared(int):共享方式,尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回false 

 

  acquire(int)

  a、tryAcquire()尝试直接去获取资源,如果成功则直接返回;

  b、addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式

  c、acquireQueueed() 使线程在等待队列中休息,有机会时会去尝试获取资源,获取到资源后才返回,如果在正在等待过程中被中断过,则返回true,否则返回false

  d、如果线程在等待过程中被中断过,它是不响应的,只是获取资源后才进行自我中断selfInterrupt(),将中断补上

 

  release(int)

  此方法是独占模式下线程释放共享资源的顶层入口,它会释放指定量的资源,如果彻底释放了(state=0),它会唤醒等待队列里的其他线程来获取资源

 

  acquireShared(int)

  此方法是共享模式下线程获取共享资源的顶层入口,它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止, 跟独占模式比这里只有线程是head.next时,才会去尝试获取资源,有剩余的话还会唤醒之后的队友,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要两个,老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢还是不让,答案是老二会继续park(),等待其他线程释放资源,也更不会去唤醒老三和老四了,独占模式同一时刻只有一个线程去执行,但共享模式下,多个线程是可以同时执行的,因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了,它跟acqure()流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作

 

  releaseShared()

  此方式是共享模式下线程释放共享资源的顶层入口,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源,也就是释放掉资源后,唤醒后继

 

  下面我们介绍一下通过AQS实现的类的例子,CountDownLatch、Semaphore、CyclicBarrier、ReentrantLock等都是通过AQS实现的,其中CountDownLatch和Semaphore我们已经在前面的博客中说过了,我们着重来看剩下的两个类

 

  CyclicBarrier

  它允许一组工作线程相互等待,直到到达某个工作屏障点,只有当每个线程都准备就绪后才能继续执行后面的操作,它和CountDownLatch有相似的地方都是通过计数器实现的,但它在释放等待线程后可以重用,是循环屏障,可以一直循环来使用(计数器可重置)。CountDownLatch是一个或多个线程等待一个线程的关系,CyclicBarrier主要是实现了多个线程之间相互等待,直到所有线程都满足条件后才能继续后续的操作,如有五个线程在等待,只有这5个线程都调用了await()方法后才能继续执行

 

 

  CyclicBarrier-demo1

@Slf4j
public class CyclicBarrierExample1 {

    //定义有多少线程同步等待
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args)throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(()->{

                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception",e);
                }

            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum)throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready",threadNum);
        barrier.await();
        log.info("{} continue",threadNum);

    }
}

输出如下:

  使用此类时需先给出需要相互等待的线程数,如demo中我给出的是5个,当调用await()方法的线程数达到5个后才能继续执行,await()还可以设置等待的时候,如果超过等待时间则继续往下执行

 

  CyclicBarrier-demo2

@Slf4j
public class CyclicBarrierExample3 {

//    达到屏障之后,优先执行callback is running
    private static CyclicBarrier barrier = new CyclicBarrier(5,()->{
        log.info("callback is running");
    });

    public static void main(String[] args)throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(()->{

                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception",e);
                }

            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum)throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready",threadNum);
        barrier.await();
        log.info("{} continue",threadNum);

    }
}

输出为:

  此demo告诉我们当线程到达屏障的时候,可设置优先执行的代码

 

  ReentrantLock

  我们首先说一下ReentrantLock和synchronized的区别

  a、它俩都是可重入锁,都是同一个线程进入一次锁的计数器就自增1,所以等到锁的计数器下降为0时才会释放锁

  b、synchronized是JVM实现的,ReentrantLock是JDK实现的

  c、synchronized引入偏向锁、自旋锁后性能已经和ReentrantLock差不多了,官方推荐使用synchronized

  d、synchronized使用更加简洁,是由编译器保证锁的加锁和释放的,ReentrantLock需要手工加锁和释放锁,但在锁的细粒度和灵活度方面ReentrantLock会优于synchronized

 

  下面是ReentrantLock独有的功能

  a、可指定是公平锁(先等待的线程先获得锁)还是非公平锁

  b、提供了一个Condition类,可以分组唤醒需要唤醒的线程,而不是像synchronized要么随机唤醒一个线程,要么唤醒所有线程

  c、提供能够中断等待锁的机制

 

  ReentrantLock-demo

@Slf4j
public class LockExample2 {

    //请求总数
    public static int clientTotal = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;

    private final static Lock lock = new ReentrantLock();

    private  static void add() {

        lock.lock();
        try {
            count++;
        }finally {
           lock.unlock();
        }

    }

    public static void main(String[] args)throws Exception {

        //定义线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定义计数器闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {

                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);

    }

}

  运行结果为5000,上面就是简单的ReentrantLock的使用

 

  ReentrantReadWriteLock

@Slf4j
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        }finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {

        readLock.lock();
        try {
            return map.keySet();
        }finally {
            readLock.unlock();
        }
    }



    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key,value);
        }finally {
            writeLock.unlock();
        }
    }

    class Data{

    }

}

  此demo为读写锁demo,对读和写分别进行锁定操作,在获取写入锁的时候,不允许读操作的锁还在保持着,如果运用在读特别多而写特别少的时候,会导致写操作的饥饿,写会一直在等待,不知道什么时候会获取锁

 

  StampedLock

@Slf4j
@ThreadSafe
public class LockExample4 {

    //请求总数
    public static int clientTotal = 5000;
    //同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;

    private final static StampedLock lock = new StampedLock();

    private  static void add() {

        long stamp = lock.writeLock();
        try {
            count++;
        }finally {
           lock.unlock(stamp);
        }

    }

    public static void main(String[] args)throws Exception {

        //定义线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量
        final Semaphore semaphore = new Semaphore(threadTotal);
        //定义计数器闭锁
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count);

    }

}

  StampedLock控制锁有三种模式,分别是写、读和乐观读,StampedLokc由版本和模式两个部分组成,使用锁时返回一个数字作为票据,用相应的锁状态来表示并控制相关的访问,数字0表示没有写锁被首先访问,在读锁上分为悲观锁和乐观锁,乐观读就是在读的操作很多而写的操作很少的情况下,我们可以乐观的认为写入与读取同时方法的概率很小,因此不悲观的使用完全锁定,程序可以查看读取资料之后是否遭到写入之前的变更再采取后续的措施,这个改进可以大幅度提高程序的吞吐量

 

  Condition

@Slf4j
public class LockExample5 {

    public static void main(String[] args) {

        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();

        new Thread(()->{
            try {
                reentrantLock.lock();
                log.info("wait signal");
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal");
            reentrantLock.unlock();
        }).start();

        new Thread(()->{
            reentrantLock.lock();
            log.info("get lock");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signalAll();
            log.info("send signal");
            reentrantLock.unlock();
        }).start();
    }
}

  输出结果如下:

  执行过程如下:首先定义ReentrantLock的实例,并从实例中取出Condition,线程一调用reentrantLock.lock()方法,线程加入到aqs的等待队列中去,当调用condition.await()的时候,将线程从aqs队列中移除,对应的操作是锁的释放,并加入到condition的等待队列中去,此线程等待,因为线程一释放锁的关系,线程2被唤醒,线程2获取锁,也加入到aqs的等待队列中,线程2在执行完后执行了condition.sigalAll(),此时线程1被从condition的等待队列中取出放入aqs的等待队列中去,但此时线程一并没有被唤醒,直到线程二执行了reentrantLock.unlock,释放锁,此时aqs的等待队列中只剩下线程一,按照从头到尾的顺序唤醒线程,线程一被唤醒,继续执行,之后线程一释放锁,这个过程完毕

 

  此外最后,向大家推荐一遍关于aqs实现原理的文章,这篇博客讲的非常详细,大家可以仔细研读一下 https://www.cnblogs.com/waterystone/p/4920797.html

以上是关于并发编程:AQS的主要内容,如果未能解决你的问题,请参考以下文章

并发编程:AQS

并发编程的基石——AQS类

Java Review - 并发编程_抽象同步队列AQS

并发编程AQS--------ReentrantLock

并发编程-并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍

Java并发编程之AQS