Java 多线程进阶-并发协作控制

Posted Sweet小马

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 多线程进阶-并发协作控制相关的知识,希望对你有一定的参考价值。

Java 多线程进阶-并发协作控制

  • 线程协作对比

    • Thread/Executor/Fork-Join
      • 线程启动, 运行, 结束.
      • 线程之间缺少协作.
    • synchronized 同步
      • 互斥, 限定只有一个线程才能进入关键区.
      • 简单粗暴, 性能损失有点大>_<.
  • Lock 锁

    • Lock 也可以实现同步的效果
      • 实现更复杂的临界区结构.
      • tryLock 方法可以预判锁是否空闲.
      • 允许分离读写的操作, 多读单写.
      • 性能更好.
    • ReentrantLock 类, 可重入的互斥锁.
    • RenntrantReadWriteLock 类, 可重入的读写锁.
    • tryLock()/lock()/unlock() 函数.
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.locks.ReentrantLock;
        import java.util.concurrent.locks.ReentrantReadWriteLock;
    
        public class LockExample {
            private static final ReentrantLock queueLock111 = new ReentrantLock(); // 可重入锁
            private static final ReentrantReadWriteLock orderLock111 = new ReentrantReadWriteLock(); // 可重入读写锁
    
            /**
             * 学校门口有家奶茶店, 学生们点单有时需要排队
             * 1. 买奶茶
             * 假设想买奶茶的同学如果看到需要排队, 就决定不买了
             * (一次只有一个买)
             * <p>
             * 2. 操作奶茶账本
             * 假设奶茶店有老板和多名员工, 记录方式比较原始, 只有一个订单本
             * (多个读, 一个写)
             * 老板负责写新订单, 员工不断查看订单本得到信息来制作奶茶, 在老板写新订单的时候员工不能查看订单本
             * (写时, 不能读)
             * 多个员工可以同时查看订单本, 此时老板不能写新订单
             * (读时, 不能写)
             *
             * @param args 1
             */
            public static void main(String[] args) throws InterruptedException {
                // 1. 买奶茶的例子
                buyMilkTea(); // 使用可重入锁
                // 2. 操作奶茶账本的例子
                handleOrder(); // 使用读写锁
            }
    
            public static void buyMilkTea() throws InterruptedException {
                LockExample lockExample = new LockExample();
                int STUDENTS_COUNT = 10;
                Thread[] students = new Thread[STUDENTS_COUNT];
                for (int i = 0; i < students.length; i++) {
                    students[i] = new Thread(new Runnable() { // 匿名的线程类, 没有名字的
                        @Override
                        public void run() {
                            try {
                                long walkingTime = (long) (Math.random() * 1000);
                                Thread.sleep(walkingTime);
                                lockExample.tryToBuyMilk();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                    students[i].start();
                }
    
                for (Thread student : students) {
                    student.join();
                }
            }
    
            private void tryToBuyMilk() throws InterruptedException {
                boolean flag = true;
                while (flag) {
                    if (queueLock111.tryLock()) { // 查一下现在是否锁住, 锁住了在下面的flag地方等一下再来操作.
                        // tryLock()实际包含了两个操作, 先 try 再 lock; 如果没有锁再锁住.
                        long thinkingTime = (long) (Math.random() * 500);
                        Thread.sleep(thinkingTime);
                        System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 来一杯珍珠奶茶, 不要珍珠");
                        flag = false;
                        queueLock111.unlock();
                    } else {
                        System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 再等等");
                    }
                    if (flag) {
                        Thread.sleep(1000);
                    }
                }
            }
    
            /**
             * 处理订单
             */
            static void handleOrder() {
                LockExample lockExample = new LockExample();
    
                Thread boss = new Thread(() -> {
                    while (true) {
                        try {
                            lockExample.addOrder(); // 老板加新单子
                            long waitingTime = (long) (Math.random() * 1000);
                            Thread.sleep(waitingTime);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                boss.start();
    
                int workerCount = 3;
                Thread[] workers = new Thread[workerCount];
                for (int i = 0; i < workerCount; i++) {
                    workers[i] = new Thread(new Runnable() {
                        @Override
                        public void run() {
                            while (true) {
                                try {
                                    lockExample.viewOrder(); // 员工取出单子
                                    long workingTime = (long) (Math.random() * 5000);
                                    Thread.sleep(workingTime);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    });
                    workers[i].start();
                }
            }
    
            /**
             * 向订单本录入新订单
             */
            private void addOrder() throws InterruptedException {
                orderLock111.writeLock().lock(); // writeLock 写锁, 排他的, 只能一个线程拥有
                long writingTime = (long) (Math.random() * 1000);
                Thread.sleep(writingTime);
                System.out.println(LocalDateTime.now() + " => " + "老板新加一个订单");
                orderLock111.writeLock().unlock();
            }
    
            /**
             * 查看订单本
             */
            private void viewOrder() throws InterruptedException {
                orderLock111.readLock().lock(); // readLock 读锁, 可以多个线程共享(同时访问)
    
                long readingTime = (long) (Math.random() * 500);
                Thread.sleep(readingTime);
                System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 查看了订单本");
                orderLock111.readLock().unlock();
            }
    
        }
    
  • Semaphore 信号量

    • 由1965年Dijkstra提出的.
    • 信号量: 本质上是一个计数器.
    • 计数器大于0, 可以使用, 等于0不能使用.
    • 可以设置多个并发量, 例如限制10个访问.
    • Semaphore
      • acquire 获取.
      • release 释放.
    • 比 Lock 更进一步, 可以控制多个同时访问关键区.
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.Semaphore;
        import java.util.concurrent.TimeUnit;
    
        /**
         * 控制同时访问代码块的线程数
         * 现在有一个地下车库, 共有5个车位, 有10辆车需要停放, 每次停放时, 去申请信号量
         */
        public class SemaphoreExample {
            private final Semaphore placeSemaphore = new Semaphore(5);
    
            public static void main(String[] args) throws InterruptedException {
                SemaphoreExample example = new SemaphoreExample();
    
                int tryToParkCount = 10;
                Thread[] parkers = new Thread[tryToParkCount];
                for (int i = 0; i < parkers.length; i++) {
                    parkers[i] = new Thread(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                long randomTime = (long) (Math.random() * 1000);
                                Thread.sleep(randomTime); // 过一段时间来停车
                                if (example.parking()) {
                                    long parkingTime = (long) (Math.random() * 1200);
                                    Thread.sleep(parkingTime); // 停一段时间离开
                                    example.leaving();
                                }
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                    parkers[i].start();
                }
    
                for (Thread t : parkers) {
                    t.join();
                }
    
    
                TimeUnit.SECONDS.sleep(60);
            }
    
            private boolean parking() {
                if (placeSemaphore.tryAcquire()) { // 查看是否有剩余的信号量(剩余的车位)
                    System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 停车成功!");
                    return true;
                } else {
                    System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 没有空位");
                    return false;
                }
            }
    
            private void leaving() {
                placeSemaphore.release();
                System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 开走了");
            }
        }
    
  • Latch 等待锁

    • 是一个同步辅助类.
    • 用来同步执行任务的一个或多个线程
    • 不是用来保护临界区或共享资源
    • CountDownLatch
      • countDown() 计数减一.
      • await() 等待 latch 变成 0.
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.CountDownLatch;
    
        /**
         * 设想百米赛跑, 发令枪发出信号后选手开始跑, 全部选手跑到终点后比赛结束.
         */
        public class CountDownLatchExample {
            public static void main(String[] args) throws InterruptedException {
                CountDownLatch startSignal = new CountDownLatch(1);
                CountDownLatch doneSignal = new CountDownLatch(10);
    
                int runnerCount = 10; // 选手数量
                for (int i = 0; i < runnerCount; i++) { // create and start threads
                    new Thread(new Runner(startSignal, doneSignal)).start(); // 所有选手开始跑~
                }
                System.out.println(LocalDateTime.now() + " => " + "准备就绪..");
                startSignal.countDown(); // let all threads proceed
                System.out.println(LocalDateTime.now() + " => " + "比赛开始!");
                doneSignal.await(); // wait for all threads to finish
                System.out.println(LocalDateTime.now() + " => " + "比赛结束!");
            }
        }
    
        class Runner implements Runnable {
            private final CountDownLatch startSignal;
            private final CountDownLatch doneSignal;
    
            Runner(CountDownLatch startSignal, CountDownLatch doneSignal) {
                this.startSignal = startSignal;
                this.doneSignal = doneSignal;
            }
    
            @Override
            public void run() {
                try {
                    startSignal.await();
                    doWork();
                    doneSignal.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            private void doWork() throws InterruptedException {
                long time = (long) (Math.random() * 10 * 1000);
                Thread.sleep(time); // 随机在十秒内跑完
                System.out.printf(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 跑完全程, 用时 %d 秒 
    ", time/1000);
            }
        }
    
  • Barrier/?b?ri?r/ n.障碍物

    • 集合点, 也是一个同步辅助类
    • 允许多个线程在某一个点上进行同步
    • CyclicBarrier
      • 构造函数是需要同步的线程数量.
      • await 等待其他线程, 到达数量后就放行.
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.BrokenBarrierException;
        import java.util.concurrent.CyclicBarrier;
    
        /**
         * Barrier/?b?ri?r/ n.障碍物
         * 假定有三行数字, 用三个线程分别计算每一行的和, 最后计算综合
         */
        public class BarrierExample {
            public static void main(String[] args) {
                int rowCount = 3;
                int colCount = 5;
                final int[][] numbers = new int[rowCount][colCount];
                final int[] results = new int[rowCount];
                numbers[0] = new int[]{1, 2, 3, 4, 5};
                numbers[1] = new int[]{6, 7, 8, 9, 10};
                numbers[2] = new int[]{11, 12, 13, 14, 15};
    
                CalcFinalSum111 finalResult = new CalcFinalSum111(results);
                CyclicBarrier cyclicBarrier = new CyclicBarrier(rowCount, finalResult);
                // 当有3个线程在 barrier上await时, 就执行最终计算
    
                for (int i = 0; i < rowCount; i++) {
                    CalcRowSum111 eachRow = new CalcRowSum111(numbers, i, results, cyclicBarrier);
                    new Thread(eachRow).start();
                }
    
            }
        }
    
        class CalcRowSum111 implements Runnable {
            final int[][] numbers;
            final int rowNumber;
            final int[] result;
            final CyclicBarrier barrier;
    
            CalcRowSum111(int[][] numbers, int rowNumber, int[] result, CyclicBarrier barrier) {
                this.numbers = numbers;
                this.rowNumber = rowNumber;
    
                this.result = result;
                this.barrier = barrier;
            }
    
            @Override
            public void run() {
                int[] row = numbers[rowNumber];
                int sum = 0;
                for (int data : row) {
                    sum += data;
                    result[rowNumber] = sum;
                }
                try {
                    System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 计算第" + (rowNumber + 1) + "行结束, 结果为: " + sum);
                    barrier.await(); // 等待! 只要超过(Barrier的构造参数填入的数量)的个数, 就放行
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    
        class CalcFinalSum111 implements Runnable {
            final int[] eachRowResult;
            int finalResult;
    
            CalcFinalSum111(int[] eachRowResult) {
                this.eachRowResult = eachRowResult;
            }
    
            @Override
            public void run() {
                int sum = 0;
                for (int data : eachRowResult) {
                    sum += data;
                }
                finalResult = sum;
                System.out.println(LocalDateTime.now() + " => " + "最终结果为: " + finalResult);
            }
        }
    
  • Phaser 阶段性控制多个线程

    • 允许执行并发多阶段任务, 同步辅助类.
    • 在每一个阶段结束的位置对线程进行同步, 当所有的线程都到达这一步, 再进行下一步.
    • Phaser
      • arrive()
      • arriveAndAwaitAdvance()
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.Phaser;
    
        /**
         * 假设举行考试, 总共三道大题, 每次下发一道题目, 等所有学生都完成之后再进行下一道题
         */
        public class PhaserExample {
            public static void main(String[] args) {
                int studentCount = 5;
                Phaser phaser = new Phaser(studentCount);
    
                for (int i = 0; i < studentCount; i++) {
                    new Thread(null, new Student111(phaser), "学生" + i).start();
                }
            }
        }
    
        class Student111 implements Runnable {
            private final Phaser phaser;
    
            Student111(Phaser phaser) {
                this.phaser = phaser;
            }
    
            @Override
            public void run() {
                try {
                    doTesting(1);
                    phaser.arriveAndAwaitAdvance(); // 等到所有线程都到达了, 才放行
                    doTesting(2);
                    phaser.arriveAndAwaitAdvance();
                    doTesting(3);
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            private void doTesting(int i) throws InterruptedException {
                String name = Thread.currentThread().getName();
                System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + "开始答第" + i + "题");
                long thinkingTime = (long) (Math.random() * 1000);
                Thread.sleep(thinkingTime); // 模拟学生答题时间
                System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + "第" + i + "道题答题结束");
            }
        }
    
  • Exchanger 两个线程间交换数据

    • 允许在并发线程中互相交换消息.
    • 允许在2个线程中定义同步点, 当两个线程都到达同步点, 他们交换数据结构
    • Exchanger
      • exchange(), 线程双方互相交互数据
      • 交换数据是双向的
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.Scanner;
        import java.util.concurrent.Exchanger;
    
        /**
         * 通过Exchanger实现学生成绩查询, 两个线程间简单的数据交换,
         * 把自己线程的内容输出给另一个线程(只能简单的双向传送, 不能向MPI一样随意点对点的传输, 线程1给线程3 线程3向线程2...这样)
         */
        public class ExchangerExample {
            public static void main(String[] args) throws InterruptedException {
                Exchanger<String> exchanger = new Exchanger<>();
                BackgroundWorker111 backgroundWorker111 = new BackgroundWorker111(exchanger);
                new Thread(backgroundWorker111).start();
    
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    System.out.println(LocalDateTime.now() + " => " + "请输入要查询的学生名字:");
                    String input = scanner.nextLine().trim();
                    exchanger.exchange(input);
                    String exResult = exchanger.exchange(null); // 拿到线程反馈的结果
                    // 当两个线程都同时执行到同一个exchanger.exchange()方法, 两个线程就互相交换数据, 交换是双向的.
                    if ("exit".equals(exResult)) {
                        System.out.println(LocalDateTime.now() + " => " + "退出查询~");
                        break;
                    }
                    System.out.println(LocalDateTime.now() + " => " + "查询结果: " + exResult);
                }
            }
        }
    
        class BackgroundWorker111 implements Runnable {
            final Exchanger<String> exchanger;
    
            BackgroundWorker111(Exchanger<String> exchanger) {
                this.exchanger = exchanger;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        String item = exchanger.exchange(null);
                        switch (item) {
                            case "zhangsan":
                                exchanger.exchange("90");
                                break;
                            case "lisi":
                                exchanger.exchange("80");
                                break;
                            case "wangwu":
                                exchanger.exchange("70");
                                break;
                            case "exit":
                                exchanger.exchange("exit");
                                return; // 退出run, 即结束当前线程
                            default:
                                exchanger.exchange("no body!");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
  • 总结

    • java.util.concurrent 包提供了很多并发编程的控制协作类.
    • 根据业务特点, 使用正确的线程并发控制协作.

以上是关于Java 多线程进阶-并发协作控制的主要内容,如果未能解决你的问题,请参考以下文章

JDK并发工具之多线程团队协作:同步控制

Java 多线程进阶-并发编程 线程组ThreadGroup

Java并发编程线程间协作(上)

Java多线程系列:线程的五大状态,以及线程之间的通信与协作

Java多线程系列:线程的五大状态,以及线程之间的通信与协作

Java多线程并发控制