一个关于CountDownLatch的并发需求

Posted 次日清晨醒着眼睛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个关于CountDownLatch的并发需求相关的知识,希望对你有一定的参考价值。

需求

A,B,C可并发运行,全部成功才算成功,一个失败全员回滚。

思考

使用CountDownLatch,可以保证三个线程结束后,才进行提交成功状态。但是怎么才能判断某个任务失败了呢?

  1. 捕获子线程异常?
  2. await(long timeout, TimeUnit unit)?

陷入了沉思

加一个原子变量判断子线程异常的次数不就OK嘛(分布式用分布式锁,单机用原子类)

    @GetMapping("/id")
    String test(@PathVariable("id") String id) 

        ThreadPoolExecutor threadPoolExecutor = ExecutorFactory.newCustomerThreadExecutor(3, 3, 1000, new NameThreadFactory("画像表"));

        // 失败线程数
        LongAdder failThreadNum = new LongAdder();

        int threadSize = 2;
        CountDownLatch cdl = new CountDownLatch(threadSize);


        Thread t1 = new Thread(() -> 
            try 
                System.out.println(Thread.currentThread().getName());
                if (Objects.equals(id, "1")) 
                    throw new RuntimeException();
                
             catch (Exception e) 
                failThreadNum.increment();
             finally 
                cdl.countDown();
            
        );

        Thread t2 = new Thread(() -> 
            try 
                System.out.println(Thread.currentThread().getName());
             catch (Exception e) 
                failThreadNum.increment();
             finally 
                cdl.countDown();
            
        );

        threadPoolExecutor.submit(t1);
        threadPoolExecutor.submit(t2);

        try 
            cdl.await();
         catch (InterruptedException e) 
            throw new RuntimeException(e);
        
        if (failThreadNum.intValue() != 0) 
            System.out.println("回滚");
         else 
            System.out.println("Main over");
        
        threadPoolExecutor.shutdown();
        return "success";
    

Java并发编程-CountDownLatch

  基于AQS的前世今生,来学习并发工具类CountDownLatch。本文将从CountDownLatch的应用场景、源码原理解析来学习这个并发工具类。

1、 应用场景

  CountDownLatch是并发包中用来控制一个或者多个线程等待其他线程完成操作的并发工具类。现以工作中的一个场景来描述下CountDownLatch的应用,代码如下:

/*
模拟工作中的一个需求场景:
用户会选择多个算法来计算费用,最后会将所有算法计算出的费用做一个加权求平均数,这个平均数是最终的费用。
每个算法的复杂度都不一样,打算每个线程负责一个算法的实现,所有的线程执行完成,最后再求平均数。
1、为每个算法创建一个线程,每个线程负责一个算法的实现
2、通过CountDownLatch来控制所有算法线程的同步
3、全部计算完成后再求平均数
 */
public class CountDownLatchTask {

    public static void main(String[] args) {
        CountDownLatchTask countDownLatchTask = new CountDownLatchTask();
        countDownLatchTask.startThreads(5);
    }
    //根据线程数和选择的算法 调度算法对应的实现
    private void startThreads(int threadNumber) {
        CountDownLatch countDownLatch = new CountDownLatch(threadNumber);
        for (int i = 0; i < threadNumber; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程算法实现:" + Thread.currentThread().getName());
                    countDownLatch.countDown();
                }
            }).start();
        }
        try {
            countDownLatch.await();
            System.out.println("加权求平均数");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  在分析原理实现前,总结下CountDownLatch的作用就是阻塞其他线程直到条件允许后才释放该阻塞,除了上述这个小案例,实际工作中还有很多可以使用CountDownLatch的场景,比如解析Excel文件时可以同时解析多个Sheet页,所有的Sheet解析完成才算完成了Excel文件的解析。从这个代码中也可以看到CountDownLatch的主要方法就是await和countDown,下面将以这两个方法来分析下CountDownLatch的原理实现。

 2、 源码原理解析

 2.1 await方法

  调用await方法会阻塞当前线程直到计数器的数值为0,方法如下:

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); //共享式获取AQS的同步状态
}

  调用的是AQS的acquireSharedInterruptibly方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())//线程中断 说明闭锁对线程中断敏感
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //闭锁未使用完成 线程进入同步队列自旋等待 
            doAcquireSharedInterruptibly(arg);
    }

  其中tryAcquireShared依赖的是Sync的实现,和之前的ReentrantLockReentrantReadWriteLockSemaphore相比,CountDownLatch的Sync只提供了一种方式,代码如下:

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1; //AQS的同步状态为0则闭锁结束 可以进行下一步操作
        }

  doAcquireSharedInterruptibly方法就不再赘述,和之前Semaphore的实现是一致的,本质上仍然是AQS同步队列的入队自旋等待。

2.2 countDown方法

  调用countDown方法会将计数器的数值减1直到计数器为0,方法如下:

public void countDown() {
        sync.releaseShared(1);
    }

  和Semaphore一样,调用的是AQS的releaseShared方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//减少闭锁的计数器
            doReleaseShared();//唤醒后续线程节点
           return true;
        }
        return false;
    }

  其中tryReleaseShared依赖的是Sync的实现,和之前的ReentrantLockReentrantReadWriteLockSemaphore相比,CountDownLatch的Sync只提供了一种方式,代码如下:

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false; //计数器已经是0了
                int nextc = c-1; //计数器减1
                if (compareAndSetState(c, nextc)) //CAS更新同步状态
                    return nextc == 0;
            }
        }

  唤醒后续线程节点的doReleaseShared也不再赘述,和之前Semaphore的实现是一致的。

  总结:CountDownLatch类使用AQS同步状态来表示计数。在await时,所有的线程进入同步队列自旋等待,在countDown时,获取闭锁成功的线程会减少闭锁的计数器,同时唤醒后续线程取获取闭锁,直到await中的计数器为0,获取到闭锁的线程才可以通过,执行下一步操作。

 

参考资料:

https://github.com/lingjiango/ConcurrentProgramPractice

以上是关于一个关于CountDownLatch的并发需求的主要内容,如果未能解决你的问题,请参考以下文章

Java并发工具类之CountDownLatch

Java并发之CountDownLatch工具类

java并发CountDownLatch

Java并发工具类:CountDownLatch

Java并发之CountDownLatch的使用

使用 CountDownLatch 控制多个线程执行顺序