Java并发包中线程并发器

Posted FFStayF

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发包中线程并发器相关的知识,希望对你有一定的参考价值。

一、CountDownLatch

场景:主线程需要等待所有子线程执行完毕后再进行汇总

CountDownLatch实现比较简单,继承AQS实现了一个不可重入共享锁Sync

1.不可重入共享锁Sync

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

//尝试获取锁 仅state==0时才能获取成功
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
//尝试释放锁
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }

2.方法

1)void await()

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);//尝试获取锁,不忽略中断引起的返回
    }

2)boolean await(long timeout, TimeUnit unit)

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));//尝试一定时间内获取锁
    }

3)void countDown()

    public void countDown() {
        sync.releaseShared(1);
    }

3.实例

public class CountDownLatchTest {

    //定义CountDownLatch  实际创建共享锁  且锁已被两个线程持有 state == 2
    private static CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        pool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println("childThreadOne over");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //线程1释放共享锁,state--
                    countDownLatch.countDown();
                }
            }
        });
        pool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println("childThreadTwo over");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //线程2释放共享锁,state--
                    countDownLatch.countDown();
                }
            }
        });
        System.out.println("wait all child thread over");
        //主线程阻塞, 实际尝试获取共享锁 ,仅state == 0时获取成功或被中断打断引起异常
        countDownLatch.await();
        System.out.println("all child thread over");
        pool.shutdown();
    }
}

二、CyclicBarrier回环屏障

和CountDownLatch场景一样,但是CountDownLatch是一次性的,CyclicBarrier可重复使用;实现方式不同,所以使用方式不同,范围更大,见后面实例

CyclicBarrier采用独占锁ReentranLock及条件变量trip(阻塞到达屏障的线程)实现

设置一道屏障,①当线程数小于屏障规定的线程数时,线程入trip条件阻塞队列,线程阻塞;②当线程数等于屏障规定的线程数时,唤醒trip中所有的线程,并重置计数器状态(越过屏障)

另外CyclicBarrier也不忽略中断引起的返回,会抛出异常,屏障会失效,抛错genetation.barrier = true

1)变量与构造方法

    /** 独占锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 条件变量 */
    private final Condition trip = lock.newCondition();
    /** 屏障阻塞的线程个数 */
    private final int parties;
    /* 突破屏障后执行的任务  默认为空 */
    private final Runnable barrierCommand;
    /** 默认false,当前屏障被中断打破后,设置为true,继续使用屏障会抛出异常BrokenBarrierException */
    private Generation generation = new Generation();

    /**
     * 实际计数器  count == 0时突破屏障
     */
    private int count;

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

2.方法

1)int dowait(boolean timed, long nanos)

    /**
     * 主要代码
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                //中断引起的跨过屏障,后续await屏障都会抛错
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                //当前线程被中断,唤醒trip的所有阻塞线程,设置g.broken == true,抛出异常
                breakBarrier();
                throw new InterruptedException();
            }
            //调用一次数据器-1
            int index = --count;
            //当计数器 == 0时,达到屏蔽的线程数,越过屏障
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        //先执行屏障任务
                        command.run();
                    ranAction = true;
                    //唤醒条件变量中所有线程trip.signalAll();
                    //重置计数器count = parties;
                    //重置版本generation = new Generation();
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        //执行屏障任务抛错时,
                        //依然唤醒所有阻塞线程,
                        //但设置g.barrier == true,后续屏障都会抛错
                        breakBarrier();
                }
            }

            // 当计数器 != 0 时,当前线程入条件阻塞队列
            for (;;) {
                try {
                    if (!timed)
                        //无限阻塞
                        trip.await();
                    else if (nanos > 0L)
                        //超时阻塞
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

2)int await()

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

3) int await(long timeout, TimeUnit unit)

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

3.实例

public class CyclicBarrierTest {

    //设置屏障线程数为2  state = 2 
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(2);

        pool.submit(new Runnable(){
            @Override
            public void run() {
                try {
                    System.out.println("thread1 step1");
                    //线程1入trip阻塞队列,state--
                    cyclicBarrier.await();
                    System.out.println("thread1 step2");
                    cyclicBarrier.await();
                    System.out.println("thread1 step3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        });

        pool.submit(new Runnable(){
            @Override
            public void run() {
                try {
                    System.out.println("thread2 step1");
                    //线程2入trip阻塞队列,state--
                    //与线程1的step1一起导致state == 0,越过屏障唤醒两个线程,state重新设置为2后续逻辑一致
                    cyclicBarrier.await();
                    System.out.println("thread2 step2");
                    cyclicBarrier.await();
                    System.out.println("thread2 step3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        });
        pool.shutdown();
    }
}

三、Semaphore

场景:与CountDownLatch一样

信号量同步器设计类似于CountDownLatch,不同的是计数器是递增的

Semaphore不仅实现了公平锁,还实现了非公平锁

 1.共享锁Sync

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }


    /**
     * 非公平锁
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * 公平锁
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

2实例

public class SemaphoreTest {

    //信号量
    private static Semaphore semaphore = new Semaphore(0);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        pool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("thread1 over");
                    //释放+1
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        pool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("thread2 over");
                    //释放+1
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        //同步
        semaphore.acquire(2);
        System.out.println("all child thread over");
        pool.shutdown();
    }
}

四、总结

1.线程同步的设计类似《操作系统原理》中的进程同步,信号量机制,PV操作

2.CountDownLatch实现线程同步(计数器自减),是一次性的,仅支持公平锁,线程FIFO;

CyclicBarrier实现线程同步(计数器自减),是可复用的(计数器还原),使用独占锁ReentranLock的条件变量trip的阻塞队列实现。

Semaphore实现线程同步(计数器自增),也是可以复用的(计数器归0),提供公平锁与非公平锁实现。

以上是关于Java并发包中线程并发器的主要内容,如果未能解决你的问题,请参考以下文章

Java并发包基石-AQS详解

Java并发包基石-AQS详解

Java并发包基石-AQS详解

JAVA高并发程序设计学习-JDK并发包:同步控制一

Java并发包下锁学习第二篇Java并发基础框架-队列同步器介绍

Java 并发编程实践基础 读书笔记: 第三章 使用 JDK 并发包构建程序