AQS 同步组件学习

Posted wcgstudy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS 同步组件学习相关的知识,希望对你有一定的参考价值。

CountDownLatch 实例代码:

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CountDownLatchExample1 

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception 

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) 
            final int threadNum = i;
            exec.execute(() -> 
                try 
                    test(threadNum);
                 catch (Exception e) 
                    log.error("exception", e);
                 finally 
                    countDownLatch.countDown();
                
            );
        
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    

    private static void test(int threadNum) throws Exception 
        Thread.sleep(100);
        log.info("", threadNum);
        Thread.sleep(100);
    

semaphore : 控制并发访问的线程个数

 通过提供同步机制,来控制当前访问的线程个数

 tryacquire: 尝试获取可用资源,如果获取不到就丢弃

 

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SemaphoreExample3 

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception 

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) 
            final int threadNum = i;
            exec.execute(() -> 
                try 
                    if (semaphore.tryAcquire())  // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    
                 catch (Exception e) 
                    log.error("exception", e);
                
            );
        
        exec.shutdown();
    

    private static void test(int threadNum) throws Exception 
        log.info("", threadNum);
        Thread.sleep(1000);
    

cyclicBarrier: 使用场景和countDownLatch的使用场景很类似,但是可以循环的使用

实现了多个线程之间的相互等待,知道所有的线程都执行完成之后,才进行下一步的操作

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class CyclicBarrierExample2 

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception 

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) 
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> 
                try 
                    race(threadNum);
                 catch (Exception e) 
                    log.error("exception", e);
                
            );
        
        executor.shutdown();
    

    private static void race(int threadNum) throws Exception 
        Thread.sleep(1000);
        log.info(" is ready", threadNum);
        try 
            barrier.await(2000, TimeUnit.MILLISECONDS);
         catch (Exception e) 
            log.warn("BarrierException", e);
        
        log.info(" continue", threadNum);
    

设置达到资源屏障时优先执行的方法:

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CyclicBarrierExample3 

    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> 
        log.info("callback is running");
    );

    public static void main(String[] args) throws Exception 

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) 
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> 
                try 
                    race(threadNum);
                 catch (Exception e) 
                    log.error("exception", e);
                
            );
        
        executor.shutdown();
    

    private static void race(int threadNum) throws Exception 
        Thread.sleep(1000);
        log.info(" is ready", threadNum);
        barrier.await();
        log.info(" continue", threadNum);
    

 

以上是关于AQS 同步组件学习的主要内容,如果未能解决你的问题,请参考以下文章

AQS 原理以及 AQS 同步组件总结

从ReentrantLock加锁解锁角度分析AQS

队列同步器——AQS

多线程七 AQS

AQS的几个同步组件

AQS同步组件