JUC高级多线程_07:读写锁与阻塞队列的具体介绍与使用

Posted ABin-阿斌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC高级多线程_07:读写锁与阻塞队列的具体介绍与使用相关的知识,希望对你有一定的参考价值。

我是 ABin-阿斌:写一生代码,创一世佳话,筑一揽芳华。 如果小伙伴们觉得我的文章有点 feel ,那就点个赞再走哦。
在这里插入图片描述

1.读写锁

  • Lock 锁不管读和写都会加锁,对于我们的读操作可以不用加锁。 这时我们就可以使用读写锁,读:操作独占锁,写:操作共享锁。
  • 独占锁(写锁) 一次只能被一个线程占有,共享锁(读锁) 多个线程可以同时占有。ReadWriteLock 读-读 可以共存!读-写 不能共存!写-写 不能共存!

1.代码演示:

  • 模拟多个线程同时对一个 Map 进行读、写操作
/**
 * 没加锁的情况下
 */
class MyCache {
    private volatile HashMap<String, Object> map = new HashMap<>();

    /**
     * 写,同时只能一个人
     *
     * @param key
     * @param value
     */
    public void put(String key, Object value) {
        System.out.println(Thread.currentThread().getName() + "写入数据中..." + key);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName() + "写入数据成功!" + value);

    }

    /**
     * 读,可以同时多人
     *
     * @param key
     */
    public void get(String key) {
        System.out.println(Thread.currentThread().getName() + "读取数据中..." + key);
        map.get(key);
        System.out.println(Thread.currentThread().getName() + "读取数据成功!");

    }


}

/**
 * 加锁得情况下
 */
class MyCacheLock {
    private volatile Map<String, Object> map = new HashMap<>();
    //读写锁:更加细粒度的控制
    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    /**
     * 存:写入的时候,只希望同时只有一个线程写
     *
     * @param key
     * @param value
     */
    public void put(String key, Object value) {
        //加写锁
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "写入中..." + key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "写入成功!" + key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //解锁
            readWriteLock.writeLock().unlock();
        }
    }

    /**
     * 取:读,所有人都可以同时读取
     *
     * @param key
     */
    public void get(String key) {
        //加读锁
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "读取数据中..." + key);
            map.get(key);
            System.out.println(Thread.currentThread().getName() + "读取数据成功!" + key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //解锁
            readWriteLock.readLock().unlock();
        }
    }
}


public class ReadWriteLockTest {
    public static void main(String[] args) {
        //没有加锁的情况下
//        MyCache myCache = new MyCache();
        MyCacheLock myCacheLock = new MyCacheLock();
        //写入数据
        for (int i = 0; i <= 6; i++) {
            final int temp = i;
            new Thread(() -> {
//                myCache.put(temp + "", temp + "");
                myCacheLock.put(temp + "", temp + "");
            }, String.valueOf(i)).start();
        }
        //读取数据
        for (int i = 0; i <= 6; i++) {
            final int temp = i;
            new Thread(() -> {
//                myCache.get(temp + "");
                myCacheLock.get(temp + "");
            }, String.valueOf(i)).start();
        }
    }
}


  • 没有加锁之前的输出情况
    在这里插入图片描述
    问题: 线程 1 还没有写入完成,其他线程就一起来写,不能保证数据一致性

  • 加锁之后
    在这里插入图片描述

  • 每个线程只有写入完成其他线程才可以陆续开始写数据

2 . 阻塞队列

1. 简介

  • 当遇到不得不阻塞的情况,就需要使用阻塞队列

  • 使用阻塞队列管理阻塞、被阻塞的资源

  • 阻塞队列是一个队列,在数据结构中起的作用如下图:

  • 当队列是空的,从队列中获取元素的操作将会被阻塞

  • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素

  • 当队列是满的,从队列中添加元素的操作将会被阻塞

  • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增

2. 作用:

  • 在多线程领域: 所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
  • 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了
  • concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

3. 架构图

需要重点掌握的:

  1. ArrayBlockingQueue: 由数组结构组成的有界阻塞队列。
  2. LinkedBlockingQueue: 由链表结构组成的有界(但大小默认值为 integer.MAX_VALUE)阻塞队列。
  3. SynchronousQueue: 不存储元素的阻塞队列,也即单个元素的队列。(生产一个,消费一个)

4. 核心方法

5. 代码演示:

public class BlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        //第一组测试:会抛出异常
        //test01();
        //第二组测试:有返回值,不会抛出异常
        //test02();
        //第三组测试:等待,阻塞(一直阻塞)
//        test03();
        //第四组测试:等待,阻塞(等待超时)
        test04();
    }

    /**
     * 第一组测试:会抛出异常
     */
    public static void test01() {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        // 插入成功,不返回,不抛异常
        System.out.println(blockingQueue.add("张三"));
        System.out.println(blockingQueue.add("李四"));
        System.out.println(blockingQueue.add("王五"));
        //添加数超出设置队列数:会报异常:IllegalStateException:Queue full
//        System.out.println(blockingQueue.add("赵六"));

        System.out.println(blockingQueue.remove("张三"));
        System.out.println(blockingQueue.remove("李四"));
        System.out.println(blockingQueue.remove("王五"));
        //当阻塞队列空时,再删除 会报异常:NoSuchElementException
        System.out.println(blockingQueue.remove("赵六"));

        //检查队列队首元素
        System.out.println(blockingQueue.element());
    }

    /**
     * 第二组测试:有返回值,不会抛出异常
     */
    public static void test02() {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        // 插入成功,返回 true
        System.out.println(blockingQueue.offer("张三"));
        System.out.println(blockingQueue.offer("李四"));
        System.out.println(blockingQueue.offer("王五"));

        //当队列满再 offer 会返回 false
        System.out.println(blockingQueue.offer("赵六"));

        //移除成功,返回对应的值
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());

        //当队列满再 poll 会返回 null
        System.out.println(blockingQueue.poll());

        //检查队列队首元素
        System.out.println(blockingQueue.peek());
    }

    /**
     * 第三组测试:等待,阻塞(一直阻塞)
     */
    public static void test03() throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        // 一直阻塞
        blockingQueue.put("张三");
        blockingQueue.put("李四");
        blockingQueue.put("王五");

        //当队列满再 put 会一直阻塞,不会返回,不会抛异常
        //直到有空位
        blockingQueue.put("赵六");

        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());

        //当队列满再 take 会一直阻塞,不会返回,不会抛异常
        //直到有值插入
        System.out.println(blockingQueue.take());


    }

    /**
     * 第四组测试:等待,阻塞(等待超时)
     */
    public static void test04() throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        // 插入成功,不返回,不抛异常
        System.out.println(blockingQueue.offer("张三"));
        System.out.println(blockingQueue.offer("李四"));
        System.out.println(blockingQueue.offer("王五"));

        //当队列满再 take 会阻塞一段时间,一段时间会返回 true / false
        System.out.println(blockingQueue.offer("赵六",3,TimeUnit.SECONDS));


        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
    }
}

6.同步队列

注意事项:

  • 同步队列和其他的 BlockingQueue 不一样,SynchronousQueue 不存储元素
  • 添加(put)了一个元素,必须从里面先(take)取出来,否则不能在put进去值

代码演示:

public class SynchronousQueueTest {
    public static void main(String[] args) {
        //同步队列
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "添加一号成功!");
                blockingQueue.put("一号");
                System.out.println(Thread.currentThread().getName() + "添加二号成功!");
                blockingQueue.put("二号");
                System.out.println(Thread.currentThread().getName() + "添加三号成功!");
                blockingQueue.put("三号");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程一:").start();

        new Thread(() -> {
            try {
                //阻塞三秒
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "取出成功" + blockingQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "取出成功" + blockingQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "取出成功" + blockingQueue.take());
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程二:").start();
    }
}

以上是关于JUC高级多线程_07:读写锁与阻塞队列的具体介绍与使用的主要内容,如果未能解决你的问题,请参考以下文章

JUC高级多线程_10:Future异步回调的具体介绍与使用

JUC高级多线程_09:ForkJoin框架的具体介绍与使用

JUC高级多线程_11:JMM与Volatile的具体介绍与使用

线程高级操作

JUC多线程:阻塞队列ArrayBlockingQueue与LinkedBlockingQueue

JUC并发编程 详解锁与队列