Java并发——多线程并发编程中的同步器

Posted 守夜人爱吃兔子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发——多线程并发编程中的同步器相关的知识,希望对你有一定的参考价值。

1. CyclicBarrier

  • CountDownLatch 类似
  • 线程会等待,直到足够多线程达到了事先规定的数据。一旦触发条件,就可以进行下一步的操作
  • 适用于线程之间相互等待处理结果就绪的场景
  • CyclicBarrier 可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就会被撤销,所有线程再统一出发,继续执行剩下的任务

用法一:等待所有人到达指定地点,再统一出发

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到了,大家统一出发");
            }
        });

        for (int i = 0; i < 5; i++) {
            new Thread(new Task(cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable {

        private final CyclicBarrier cyclicBarrier;

        public Task(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 现在前往集合地点");
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(Thread.currentThread().getName() + " 已经到了集合地点,等待其他人到达");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

用法二:CyclicBarrier简单使用示例

2. CountDownLatch

  • CyclicBarrier 类似,数量递减到0时,触发动作
  • 但是不可重复使用

主要方法:

  1. CountDownLatch(int count),仅有一个构造函数,参数 count 为需要倒数的数值
  2. await(),调用 await() 方法的现场会被挂起,它会等待直到 count 值为0才继续执行
  3. countDown(),讲 count 值减1,直到为0时,等待的现场会被唤起

用法一:一个线程等待多个线程都执行完毕,再继续自己的工作(一等多)

import java.util.concurrent.*;

public class Test {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("Num:" + no);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("等待5个线程执行完....");
        latch.await();
        System.out.println("所有线程都已经执行完,可以进入下一个环节了。");
    }
}

用法二:多个线程等待某一个线程的信号,然后同时开始执行(多等一)

import java.util.concurrent.*;

/**
 * 模拟跑步比赛,5名选手等待1名裁判发令,发令后所有人同时开始跑步
 */
public class Test {

    public static void main(String[] args) throws InterruptedException {
        // 1名裁判
        CountDownLatch latch = new CountDownLatch(1);
        // 5名选手
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("NO." + no + "号选手准备完毕,等待发令");
                        latch.await();
                        System.out.println("NO." + no + "开始跑步");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("裁判检查发令枪....");
        Thread.sleep(2000);
        System.out.println("裁判检查完毕,比赛开始....");
        latch.countDown();
    }
}

CyclicBarrier 和 CountDownLatch 的区别

  • 作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch用于事件,但是CyclicBarrier是用于线程的
  • 可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例;而CyclicBarrier可以重复使用

3. Semaphore

  • 信号量,可以通过控制“许可证”的数量,来保证线程之间的配合
  • 线程只有在拿到“许可证”后才能继续运行,相比于其他的同步器,更灵活

主要方法:

  1. new Semaphore(int permits, boolean fair),这里可以设置是否要设置公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列里,以便于当有了新的许可证,可以分发给之前等了最长时间的线程;
  2. acquire(),获取许可证,没有的话会陷入阻塞状态;
  3. acquireUninterruptibly(),同上,但是可以响应中断;
  4. tryAcquire(),看看现在有没有空闲的许可证,如果有的话就获取,没有的话,不用陷入阻塞状态,可以去做其它的事,过一会再来查看许可证的空闲情况
  5. tryAcquire(long timeout, TimeUnit unit),和 tryAcquire() 一样,但是多了一个超时时间,比如“在3秒内获取不到许可证,就去做别的事情”
  6. release(),操作完成后,归还许可证
  7. 可以指定获取和释放的许可证数量,获取和释放的数量必须一致

用法:

import java.util.concurrent.*;

public class Test {

    static Semaphore semaphore = new Semaphore(3, true);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 100; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + " 拿到了许可证");

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + " 释放了许可证");
            semaphore.release();
        }
    }
}

4. Condition

  • 控制线程的“等待”和“唤醒”,Object.wait()的升级版

实现生产者消费者

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo {

    private final int queueSize = 10;
    private final PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo demo = new ConditionDemo();
        Produce produce = demo.new Produce();
        Consume consume = demo.new Consume();
        produce.start();
        consume.start();
    }

    /**
     * 消费者
     */
    class Consume extends Thread {
        @Override
        public void run() {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void consume() throws InterruptedException {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        notEmpty.await();
                    }

                    // 从队列中取数据
                    queue.poll();
                    // 并且唤醒生产者
                    notFull.signal();

                    System.out.println("从队列中取走了一条数据,队列剩余:" + queue.size());

                } finally {
                    lock.unlock();
                }
            }
        }
    }

    /**
     * 生产者
     */
    class Produce extends Thread {
        @Override
        public void run() {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        private void produce() throws InterruptedException {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待消费者进行消费");
                        notFull.await();
                    }

                    // 开始生产数据
                    queue.offer(1);
                    // 并且唤醒消费者
                    notEmpty.signal();

                    System.out.println("向队列添加了一条数据,队列剩余:" + queue.size());

                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

5. Phaser

  • CyclicBarrier 类似

示例:4个线程执行完后做一次同步操作

import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;

public class PhaserTest {

    Phaser phaser = new Phaser();
    ExecutorService executorService = Executors.newCachedThreadPool();

    class Worker implements Runnable {
        @Override
        public void run() {
            phaser.register();
            while (true) {
                try {
                    Thread.sleep(500);
                    System.out.println("working:" + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public void run() throws InterruptedException {
        phaser.register();
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        executorService.execute(new Worker());
        while (true) {
            phaser.arriveAndAwaitAdvance();
            System.out.println("Sync..." + phaser.getPhase());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        var test = new PhaserTest();
        test.run();
    }
}

6. Exchanger

  • 让两个线程在合适时交换数据
  • 适用场景:当两个线程工作在同一个类的不同实例上时,用于交换数据
  • 解决了什么问题:线程间高效交换数据

最后

最近我整理了整套《JAVA核心知识点总结》,说实话 ,作为一名Java程序员,不论你需不需要面试都应该好好看下这份资料。拿到手总是不亏的~我的不少粉丝也因此拿到腾讯字节快手等公司的Offer

Java进阶之路群,找管理员获取哦-!

以上是关于Java并发——多线程并发编程中的同步器的主要内容,如果未能解决你的问题,请参考以下文章

Java线程与并发编程实践----同步器(Phaser)

Java多线程编程——对象及变量的并发访问

Java多线程编程——对象及变量的并发访问

Java并发编程基础(入门篇)

Java线程与并发编程实践----同步器(交换器信号量)

Java线程与并发编程实践----同步