JUC高级多线程_07:读写锁与阻塞队列的具体介绍与使用
Posted ABin-阿斌
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC高级多线程_07:读写锁与阻塞队列的具体介绍与使用相关的知识,希望对你有一定的参考价值。
我是 ABin-阿斌:写一生代码,创一世佳话,筑一揽芳华。 如果小伙伴们觉得我的文章有点 feel ,那就点个赞再走哦。
1.读写锁
- Lock 锁不管读和写都会加锁,对于我们的读操作可以不用加锁。 这时我们就可以使用读写锁,读:操作独占锁,写:操作共享锁。
- 独占锁(写锁) 一次只能被一个线程占有,共享锁(读锁) 多个线程可以同时占有。ReadWriteLock 读-读 可以共存!读-写 不能共存!写-写 不能共存!
1.代码演示:
- 模拟多个线程同时对一个 Map 进行读、写操作
/**
* 没加锁的情况下
*/
class MyCache {
private volatile HashMap<String, Object> map = new HashMap<>();
/**
* 写,同时只能一个人
*
* @param key
* @param value
*/
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + "写入数据中..." + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入数据成功!" + value);
}
/**
* 读,可以同时多人
*
* @param key
*/
public void get(String key) {
System.out.println(Thread.currentThread().getName() + "读取数据中..." + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取数据成功!");
}
}
/**
* 加锁得情况下
*/
class MyCacheLock {
private volatile Map<String, Object> map = new HashMap<>();
//读写锁:更加细粒度的控制
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
/**
* 存:写入的时候,只希望同时只有一个线程写
*
* @param key
* @param value
*/
public void put(String key, Object value) {
//加写锁
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入中..." + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入成功!" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
//解锁
readWriteLock.writeLock().unlock();
}
}
/**
* 取:读,所有人都可以同时读取
*
* @param key
*/
public void get(String key) {
//加读锁
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取数据中..." + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取数据成功!" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
//解锁
readWriteLock.readLock().unlock();
}
}
}
public class ReadWriteLockTest {
public static void main(String[] args) {
//没有加锁的情况下
// MyCache myCache = new MyCache();
MyCacheLock myCacheLock = new MyCacheLock();
//写入数据
for (int i = 0; i <= 6; i++) {
final int temp = i;
new Thread(() -> {
// myCache.put(temp + "", temp + "");
myCacheLock.put(temp + "", temp + "");
}, String.valueOf(i)).start();
}
//读取数据
for (int i = 0; i <= 6; i++) {
final int temp = i;
new Thread(() -> {
// myCache.get(temp + "");
myCacheLock.get(temp + "");
}, String.valueOf(i)).start();
}
}
}
-
没有加锁之前的输出情况
问题: 线程 1 还没有写入完成,其他线程就一起来写,不能保证数据一致性 -
加锁之后
-
每个线程只有写入完成其他线程才可以陆续开始写数据
2 . 阻塞队列
1. 简介
-
当遇到不得不阻塞的情况,就需要使用阻塞队列
-
使用阻塞队列管理阻塞、被阻塞的资源
-
阻塞队列是一个队列,在数据结构中起的作用如下图:
-
当队列是空的,从队列中获取元素的操作将会被阻塞
-
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
-
当队列是满的,从队列中添加元素的操作将会被阻塞
-
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
2. 作用:
- 在多线程领域: 所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
- 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了
- 在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
3. 架构图
需要重点掌握的:
- ArrayBlockingQueue: 由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue: 由链表结构组成的有界(但大小默认值为 integer.MAX_VALUE)阻塞队列。
- SynchronousQueue: 不存储元素的阻塞队列,也即单个元素的队列。(生产一个,消费一个)
4. 核心方法
5. 代码演示:
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
//第一组测试:会抛出异常
//test01();
//第二组测试:有返回值,不会抛出异常
//test02();
//第三组测试:等待,阻塞(一直阻塞)
// test03();
//第四组测试:等待,阻塞(等待超时)
test04();
}
/**
* 第一组测试:会抛出异常
*/
public static void test01() {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// 插入成功,不返回,不抛异常
System.out.println(blockingQueue.add("张三"));
System.out.println(blockingQueue.add("李四"));
System.out.println(blockingQueue.add("王五"));
//添加数超出设置队列数:会报异常:IllegalStateException:Queue full
// System.out.println(blockingQueue.add("赵六"));
System.out.println(blockingQueue.remove("张三"));
System.out.println(blockingQueue.remove("李四"));
System.out.println(blockingQueue.remove("王五"));
//当阻塞队列空时,再删除 会报异常:NoSuchElementException
System.out.println(blockingQueue.remove("赵六"));
//检查队列队首元素
System.out.println(blockingQueue.element());
}
/**
* 第二组测试:有返回值,不会抛出异常
*/
public static void test02() {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// 插入成功,返回 true
System.out.println(blockingQueue.offer("张三"));
System.out.println(blockingQueue.offer("李四"));
System.out.println(blockingQueue.offer("王五"));
//当队列满再 offer 会返回 false
System.out.println(blockingQueue.offer("赵六"));
//移除成功,返回对应的值
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//当队列满再 poll 会返回 null
System.out.println(blockingQueue.poll());
//检查队列队首元素
System.out.println(blockingQueue.peek());
}
/**
* 第三组测试:等待,阻塞(一直阻塞)
*/
public static void test03() throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// 一直阻塞
blockingQueue.put("张三");
blockingQueue.put("李四");
blockingQueue.put("王五");
//当队列满再 put 会一直阻塞,不会返回,不会抛异常
//直到有空位
blockingQueue.put("赵六");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//当队列满再 take 会一直阻塞,不会返回,不会抛异常
//直到有值插入
System.out.println(blockingQueue.take());
}
/**
* 第四组测试:等待,阻塞(等待超时)
*/
public static void test04() throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// 插入成功,不返回,不抛异常
System.out.println(blockingQueue.offer("张三"));
System.out.println(blockingQueue.offer("李四"));
System.out.println(blockingQueue.offer("王五"));
//当队列满再 take 会阻塞一段时间,一段时间会返回 true / false
System.out.println(blockingQueue.offer("赵六",3,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
}
6.同步队列
注意事项:
- 同步队列和其他的 BlockingQueue 不一样,SynchronousQueue 不存储元素
- 添加(put)了一个元素,必须从里面先(take)取出来,否则不能在put进去值
代码演示:
public class SynchronousQueueTest {
public static void main(String[] args) {
//同步队列
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "添加一号成功!");
blockingQueue.put("一号");
System.out.println(Thread.currentThread().getName() + "添加二号成功!");
blockingQueue.put("二号");
System.out.println(Thread.currentThread().getName() + "添加三号成功!");
blockingQueue.put("三号");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程一:").start();
new Thread(() -> {
try {
//阻塞三秒
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "取出成功" + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "取出成功" + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "取出成功" + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程二:").start();
}
}
以上是关于JUC高级多线程_07:读写锁与阻塞队列的具体介绍与使用的主要内容,如果未能解决你的问题,请参考以下文章
JUC高级多线程_10:Future异步回调的具体介绍与使用
JUC高级多线程_09:ForkJoin框架的具体介绍与使用
JUC高级多线程_11:JMM与Volatile的具体介绍与使用