多线程案例-阻塞式队列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程案例-阻塞式队列相关的知识,希望对你有一定的参考价值。
1.什么是阻塞队列
阻塞队列是一种特殊的队列,在"先进先出"的原则下又引入了"阻塞"功能
阻塞队列能是一种线程安全的数据结构,具有以下特性:
当队列满的时候,继续入队列就会阻塞,直到其它线程从队列中取走元素
当队列空的时候,继续出队列就会阻塞,直到其它队列向队列中插入元素
阻塞式队列的典型应用场景是"生产者消费者模型"
2.生产者消费者模型
生产者消费者模型通过一个容器来解决生产者和消费者的强耦合问题,生产者与消费者之间不直接通讯,通过阻塞队列来通讯,生产数据后放入阻塞队列,消费者可以直接从阻塞队列获取数据
生产者消费者模型能带来两个非常重要的好处:
1.阻塞队列能使生产者和消费者之间解耦
2.阻塞队列起到缓冲的作用,平衡了生产者消费者的处理能力
1.开发中典型的场景:服务器之间的相互调用
当客户端程序向A服务器发起一个请求后,A服务器将请求转发给B服务器处理,然后B服务器处理完成后将结果返回,此时可以视为:A调用了B
这种场景下,AB两个服务器的耦合程度是比较高的!!如果B服务器出现问题,也会引起A的bug,不仅如此,如果A再需要调用C服务器,还需要修改A的代码,非常麻烦..
针对这种场景使用生产者消费者模型,能有效降低耦合
此时,AB之间的耦合就降低很多了,AB都只知道队列的存在,A的代码不与B相关,B的代码也不与A相关,AB任何一方出现问题不会影响到另一方
2.服务器开发中,用户发送的请求的数量是不可控的,如果没有充分的准备并且请求量超过了服务器的承受范围,服务器有可能直接被大量的请求冲垮.例如"秒杀"这种场景,服务器就会同一时刻受到大量请求,这个时候可以把这些请求放到阻塞队列中,让线程慢慢处理,能有效防止服务器被大量的请求冲垮
在Java标准中内置有阻塞队列BlockingQueue,是一个接口,实现类是LinkedBlockingQueue,put方法是入队列,take是出队列.这两个方法具有阻塞特性
下面使用标准库中的阻塞队列实现生产者消费者模型
public class ThreadDemo1
public static void main(String[] args)
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
//消费者
Thread customer = new Thread(()->
while(true)
try
int value = queue.take();
System.out.println("消费元素: "+value);
catch (InterruptedException e)
throw new RuntimeException(e);
,"消费者");
customer.start();
//生产者
Thread producer = new Thread(()->
Random random = new Random();
while(true)
try
Thread.sleep(500);
catch (InterruptedException e)
throw new RuntimeException(e);
int num = random.nextInt(1000);
System.out.println("生产元素: "+num);
try
queue.put(num);
catch (InterruptedException e)
throw new RuntimeException(e);
,"生产者");
producer.start();
try
customer.join();
producer.join();
catch (InterruptedException e)
throw new RuntimeException(e);
public class ThreadDemo1
public static void main(String[] args)
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
//消费者
Thread customer = new Thread(()->
while(true)
try
int value = queue.take();
System.out.println("消费元素: "+value);
catch (InterruptedException e)
throw new RuntimeException(e);
,"消费者");
customer.start();
//生产者
Thread producer = new Thread(()->
Random random = new Random();
while(true)
try
Thread.sleep(500);
catch (InterruptedException e)
throw new RuntimeException(e);
int num = random.nextInt(1000);
System.out.println("生产元素: "+num);
try
queue.put(num);
catch (InterruptedException e)
throw new RuntimeException(e);
,"生产者");
producer.start();
try
customer.join();
producer.join();
catch (InterruptedException e)
throw new RuntimeException(e);
结果
我们可以发现,生产和消费是成对出现的,程序开始运行,因为在生产者线程中让线程休眠500ms后再执行,此时阻塞队列中为空,而消费者线程要再阻塞队列中使用take方法取元素,就会陷入阻塞状态,等到阻塞队列中有生产者插入元素后才继续执行取元素!!
接下来通过"循环队列"来实现一个阻塞队列
3.阻塞队列实现
实现阻塞队列是要实现一个普通队列然后加上"阻塞功能"
这里我们使用循环队列实现阻塞队列,下面是循环队列的三种状态
我们先写一个普通的循环队列的入队和出队操作
class MyBlockingQueue
private int[] items = new int[1000];
private int head = 0;
private int tail = 0;
private int size = 0;
//入队列
public void put(int value)
if(size == items.length)
//队列满了.不能插入
return;
items[tail] = value;
tail++;
//针对tail的处理
//1)这个写法非常常见
//tail = (tail+1)%items.length;
//2)可读性好并且比求余的代码效率高
if(tail >= items.length)
tail = 0;
//插入成功
size++;
//出队列
public Integer take()
if(size == 0)
//队列为空,不能出队
return null;
int result = items[head];
head++;
if(head >= items.length)
head = 0;
size--;
return result;
class MyBlockingQueue
private int[] items = new int[1000];
private int head = 0;
private int tail = 0;
private int size = 0;
//入队列
public void put(int value)
if(size == items.length)
//队列满了.不能插入
return;
items[tail] = value;
tail++;
//针对tail的处理
//1)这个写法非常常见
//tail = (tail+1)%items.length;
//2)可读性好并且比求余的代码效率高
if(tail >= items.length)
tail = 0;
//插入成功
size++;
//出队列
public Integer take()
if(size == 0)
//队列为空,不能出队
return null;
int result = items[head];
head++;
if(head >= items.length)
head = 0;
size--;
return result;
我们在队列中插入几个元素并取出
public static void main(String[] args)
MyBlockingQueue queue = new MyBlockingQueue();
queue.put(1);
queue.put(2);
queue.put(3);
queue.put(4);
int result = 0;
result = queue.take();
System.out.println(result);
result = queue.take();
System.out.println(result);
result = queue.take();
System.out.println(result);
result = queue.take();
System.out.println(result);
public static void main(String[] args)
MyBlockingQueue queue = new MyBlockingQueue();
queue.put(1);
queue.put(2);
queue.put(3);
queue.put(4);
int result = 0;
result = queue.take();
System.out.println(result);
result = queue.take();
System.out.println(result);
result = queue.take();
System.out.println(result);
result = queue.take();
System.out.println(result);
现在我们在普通队列的基础上加上阻塞功能
阻塞功能意味着该队列是要在多线程环境下使用的
多线程环境下要保证线程安全,需要给方法加锁
使用wait()notify()方法添加阻塞功能
当队列为空时和队列满时都需要阻塞
sleep()方法是指定休眠的时间后唤醒,但是我们不能确定指定的时间是多少,需要看程序运行情况
修改后:
public void put(int value)
synchronized (this)
if(size == items.length)
//队列满了.不能插入
//return;
//阻塞
try
this.wait();
catch (InterruptedException e)
throw new RuntimeException(e);
items[tail] = value;
tail++;
//针对tail的处理
//1)这个写法非常常见
//tail = (tail+1)%items.length;
//2)可读性好并且比求余的代码效率高
if(tail >= items.length)
tail = 0;
//插入成功
size++;
//唤醒队列为空处的wait()
this.notify();
//出队列
public Integer take()
int result = 0;
synchronized (this)
if(size == 0)
//队列为空,不能出队
//return null;
//阻塞
try
this.wait();
catch (InterruptedException e)
throw new RuntimeException(e);
result = items[head];
head++;
if(head >= items.length)
head = 0;
size--;
//
//唤醒队列满的wait()
this.notify();
return result;
public void put(int value)
synchronized (this)
if(size == items.length)
//队列满了.不能插入
//return;
//阻塞
try
this.wait();
catch (InterruptedException e)
throw new RuntimeException(e);
items[tail] = value;
tail++;
//针对tail的处理
//1)这个写法非常常见
//tail = (tail+1)%items.length;
//2)可读性好并且比求余的代码效率高
if(tail >= items.length)
tail = 0;
//插入成功
size++;
//唤醒队列为空处的wait()
this.notify();
//出队列
public Integer take()
int result = 0;
synchronized (this)
if(size == 0)
//队列为空,不能出队
//return null;
//阻塞
try
this.wait();
catch (InterruptedException e)
throw new RuntimeException(e);
result = items[head];
head++;
if(head >= items.length)
head = 0;
size--;
//
//唤醒队列满的wait()
this.notify();
return result;
上述代码还有个问题
如果notifyAll()了,这里的wait()一定会被唤醒!但是该线程还没有抢占到锁,当锁被这个线程抢占到时,队列的状态可能会是满的,因此我们最好用while循环,然后继续判断队列状态
这样是比较稳妥的方法
我们使用MyBlockingQueue再写一个生产者消费者模型,看是否能达到效果
MyBlockingQueue queue = new MyBlockingQueue();
Thread customer = new Thread(()->
while(true)
int result = queue.take();
System.out.println("消费: "+result);
);
customer.start();
Thread producer = new Thread(()->
int count = 0;
while(true)
System.out.println("生产者: "+count);
queue.put(count);
count++;
try
Thread.sleep(500);
catch (InterruptedException e)
throw new RuntimeException(e);
);
producer.start();
MyBlockingQueue queue = new MyBlockingQueue();
Thread customer = new Thread(()->
while(true)
int result = queue.take();
System.out.println("消费: "+result);
);
customer.start();
Thread producer = new Thread(()->
int count = 0;
while(true)
System.out.println("生产者: "+count);
queue.put(count);
count++;
try
Thread.sleep(500);
catch (InterruptedException e)
throw new RuntimeException(e);
);
producer.start();
也是成对出现的,只有生产者向队中插入了,消费者才能获取
我们调整代码,让消费者速度降低
再来看结果
生产者生产的数据将循环队列充满后开始阻塞,消费者线程休眠结束后开始获取数据,获取一个后,阻塞队列便出现一个空位置,唤醒put的wait()后,继续生产数据,然后又阻塞等待队列不满.....
至此就用循环队列实现了阻塞队列
以上是关于多线程案例-阻塞式队列的主要内容,如果未能解决你的问题,请参考以下文章