大数据必学Java基础(六十六):BlockingQueue常见子类
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据必学Java基础(六十六):BlockingQueue常见子类相关的知识,希望对你有一定的参考价值。
文章目录
4、其他的添加或者获取的方法都是依托与这个入队和出队的基础方法
BlockingQueue常见子类
一、ArrayBlockingQueue
源码中的注释的解释说明:
1、添加元素
package com.lanson.test05;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author : Lansonli
*/
public class Test01
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException
//创建一个队列,队列可以指定容量指定长度3:
ArrayBlockingQueue aq = new ArrayBlockingQueue(3);
//添加元素:
//【1】添加null元素:不可以添加null元素,会报空指针异常:NullPointerException
//aq.add(null);
//aq.offer(null);
//aq.put(null);
//【2】正常添加元素:
aq.add("aaa");
aq.offer("bbb");
aq.put("ccc");
System.out.println(aq);//[aaa, bbb, ccc]
//【3】在队列满的情况下,再添加元素:
//aq.add("ddd");//在队列满的情况下,添加元素 出现异常:Queue full
//System.out.println(aq.offer("ddd"));//没有添加成功,返回false
//设置最大阻塞时间,如果时间到了,队列还是满的,就不再阻塞了
//aq.offer("ddd",2, TimeUnit.SECONDS);
//真正阻塞的方法: put ,如果队列满,就永远阻塞
aq.put("ddd");
System.out.println(aq);
2、获取元素
package com.lanson.test05;
import javax.sound.midi.Soundbank;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author : Lansonli
*/
public class Test02
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException
//创建一个队列,队列可以指定容量指定长度3:
ArrayBlockingQueue aq = new ArrayBlockingQueue(3);
aq.add("aaa");
aq.add("bbb");
aq.add("ccc");
//得到头元素但是不移除
System.out.println(aq.peek());
System.out.println(aq);
//得到头元素并且移除
System.out.println(aq.poll());
System.out.println(aq);
//得到头元素并且移除
System.out.println(aq.take());
System.out.println(aq);
//清空元素:
aq.clear();
System.out.println(aq);
System.out.println(aq.peek());//null
System.out.println(aq.poll());//null
//设置阻塞事件,如果队列为空,返回null,时间到了以后就不阻塞了
//System.out.println(aq.poll(2, TimeUnit.SECONDS));
//真正阻塞:队列为空,永远阻塞
System.out.println(aq.take());
3、源码
public class ArrayBlockingQueue<E>
//底层就是一个数组:
final Object[] items;
//取元素用到的索引,初始结果为0
int takeIndex;
//放元素用到的索引,初始结果为0
int putIndex;
//数组中元素的个数:
int count;
//一把锁:这个锁肯定很多方法中用到了,所以定义为属性,初始化以后可以随时使用
final ReentrantLock lock;
//锁伴随的一个等待吃:notEmpty
private final Condition notEmpty;
//锁伴随的一个等待吃:notFull
private final Condition notFull;
//构造器:
public ArrayBlockingQueue(int capacity) //传入队列指定的容量
this(capacity, false);
public ArrayBlockingQueue(int capacity, boolean fair) //传入队列指定的容量
//健壮性考虑
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化底层数组
this.items = new Object[capacity];
//初始化锁 和 等待队列
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
//两个基本方法:一个是入队,一个是出队 ,是其他方法的基础:
//入队:
private void enqueue(E x)
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;//底层数组赋给items
//在对应的下标位置放入元素
items[putIndex] = x;
if (++putIndex == items.length) //++putIndex putIndex 索引 加1
putIndex = 0;
//每放入一个元素,count加1操作
count++;
notEmpty.signal();
//出队:
private E dequeue()
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;//底层数组赋给items
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];//在对应的位置取出元素
items[takeIndex] = null;//对应位置元素取出后就置为null
if (++takeIndex == items.length)//++takeIndex 加1操作
takeIndex = 0;
count--;//每取出一个元素,count减1操作
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;//将取出的元素作为方法的返回值
takeIndex和putIndex置为0的原因:
4、其他的添加或者获取的方法都是依托与这个入队和出队的基础方法
5、感受一下put和take的阻塞
上面的while不可以换为if,因为如果notFull中的线程被激活的瞬间,有其他线程放入元素,那么队列就又满了。
那么沿着await后面继续执行就不可以,所以一定要反复确定队列是否满的,才能放入元素。
二、LinkedBlockingQueue
一个可选择的有边界的队列:
意思就是队列的长度可以指定,也可以不指定
1、添加元素
package com.lanson.test05;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author : Lansonli
*/
public class Test01
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException
//创建一个队列,队列可以指定容量指定长度3:
LinkedBlockingQueue aq = new LinkedBlockingQueue(3);
//添加元素:
//【1】添加null元素:不可以添加null元素,会报空指针异常:NullPointerException
//aq.add(null);
//aq.offer(null);
aq.put(null);
//【2】正常添加元素:
aq.add("aaa");
aq.offer("bbb");
aq.put("ccc");
System.out.println(aq);//[aaa, bbb, ccc]
//【3】在队列满的情况下,再添加元素:
//aq.add("ddd");//在队列满的情况下,添加元素 出现异常:Queue full
//System.out.println(aq.offer("ddd"));//没有添加成功,返回false
//设置最大阻塞时间,如果时间到了,队列还是满的,就不再阻塞了
//aq.offer("ddd",2, TimeUnit.SECONDS);
//真正阻塞的方法: put ,如果队列满,就永远阻塞
aq.put("ddd");
System.out.println(aq);
2、取出元素
package com.lanson.test05;
import javax.sound.midi.Soundbank;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author : Lansonli
*/
public class Test02
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException
//创建一个队列,队列可以指定容量指定长度3:
LinkedBlockingQueue aq = new LinkedBlockingQueue();
aq.add("aaa");
aq.add("bbb");
aq.add("ccc");
//得到头元素但是不移除
System.out.println(aq.peek());
System.out.println(aq);
//得到头元素并且移除
System.out.println(aq.poll());
System.out.println(aq);
//得到头元素并且移除
System.out.println(aq.take());
System.out.println(aq);
//清空元素:
aq.clear();
System.out.println(aq);
System.out.println(aq.peek());//null
System.out.println(aq.poll());//null
//设置阻塞事件,如果队列为空,返回null,时间到了以后就不阻塞了
//System.out.println(aq.poll(2, TimeUnit.SECONDS));
//真正阻塞:队列为空,永远阻塞
System.out.println(aq.take());
3、特点
ArrayBlockingQueue : 不支持读写同时操作,底层基于数组的。
LinkedBlockingQueue:支持读写同时操作,并发情况下,效率高。底层基于链表。
4、源码
入队操作:
出队操作:
public class LinkedBlockingQueue<E>
//内部类Node就是链表的节点的对象对应的类:
static class Node<E>
E item;//封装你要装的那个元素
Node<E> next;//下一个Node节点的地址
Node(E x) item = x; //构造器
//链表的长度
private final int capacity;
//计数器:
private final AtomicInteger count = new AtomicInteger();
//链表的头结点
transient Node<E> head;
//链表的尾结点
private transient Node<E> last;
//取元素用的锁
private final ReentrantLock takeLock = new ReentrantLock();
//等待池
private final Condition notEmpty = takeLock.newCondition();
//放元素用的锁
private final ReentrantLock putLock = new ReentrantLock();
//等待池
private final Condition notFull = putLock.newCondition();
public LinkedBlockingQueue()
this(Integer.MAX_VALUE);//调用类本类的空构造器,传入正21亿
public LinkedBlockingQueue(int capacity)
//健壮性考虑
if (capacity <= 0) throw new IllegalArgumentException();
//给队列指定长度
this.capacity = capacity;
//last,head指向一个新的节点,新的节点中 元素为null
last = head = new Node<E>(null);
//入队:
private void enqueue(Node<E> node)
last = last.next = node;
//出队:
private E dequeue()
Node<E> h = head;//h指向了head
Node<E> first = h.next;//first 指向head的next
h.next = h; // help GC h.next指向自己,更容易被GC发现 被GC
head = first;//head的指向指为first
E x = first.item;//取出链中第一个元素,给了x
first.item = null;
return x;//把x作为方法的返回值
5、put的阻塞
阻塞的前提是 队列是固定长度的
三、SynchronousQueue
这个特殊的队列设计的意义:
1、先添加元素
public class Test01
//这是main方法,程序的入口
public static void main(String[] args)
SynchronousQueue sq = new SynchronousQueue();
sq.add("aaa");
直接报错:说队列满了,因为队列没有容量,理解为满也是正常的
2、put方法阻塞
队列是空的,可以理解为队列满了,满的话放入元素 put 一定会阻塞
public class Test01
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException
SynchronousQueue sq = new SynchronousQueue();
sq.put("aaa");
3、先取再放
package com.lanson.test06;
import java.util.concurrent.SynchronousQueue;
/**
* @author : Lansonli
*/
public class Test02
//这是main方法,程序的入口
public static void main(String[] args)
SynchronousQueue sq = new SynchronousQueue();
//创建一个线程,取数据:
new Thread(new Runnable()
@Override
public void run()
while(true)
try
System.out.println(sq.take());
catch (InterruptedException e)
e.printStackTrace();
).start();
//搞一个线程,往里面放数据:
new Thread(new Runnable()
@Override
public void run()
try
sq.put("aaa");
sq.put("bbb");
sq.put("ccc");
sq.put("ddd");
catch (InterruptedException e)
e.printStackTrace();
).start();
结果:
4、poll方法
package com.lanson.test06;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @author : Lansonli
*/
public class Test02
//这是main方法,程序的入口
public static void main(String[] args)
SynchronousQueue sq = new SynchronousQueue();
//创建一个线程,取数据:
new Thread(new Runnable()
@Override
public void run()
while(true)
try
//设置一个阻塞事件:超出事件就不阻塞了
Object result = sq.poll(5, TimeUnit.SECONDS);
System.out.println(result);
if(result == null)
break;
catch (InterruptedException e)
e.printStackTrace();
).start();
//搞一个线程,往里面放数据:
new Thread(new Runnable()
@Override
public void run()
try
sq.put("aaa");
sq.put("bbb");
sq.put("ccc");
sq.put("ddd");
catch (InterruptedException e)
e.printStackTrace();
).start();
注意:取出元素 不能用peek,因为peek不会将元素从队列中拿走,只是查看的效果;
四、PriorityBlockingQueue
带有优先级的阻塞队列。
优先级队列,意味着队列有先后顺序的,数据有不同的权重。
无界的队列,没有长度限制,但是在你不指定长度的时候,默认初始长度为11,也可以手动指定,
当然随着数据不断的加入,底层(底层是数组Object[])会自动扩容,直到内存全部消耗殆尽了,导致 OutOfMemoryError内存溢出 程序才会结束。
不可以放入null元素的,不允许放入不可比较的对象(导致抛出ClassCastException),对象必须实现内部比较器或者外部比较器。
1、添加null数据
public class Test
//这是main方法,程序的入口
public static void main(String[] args)
PriorityBlockingQueue pq = new PriorityBlockingQueue();
pq.put(null);
2、添加四个数据
package com.lanson.test07;
/**
* @author : Lansonli
*/
public class Student implements Comparable<Student>
String name;
int age;
public Student()
public Student(String name, int age)
this.name = name;
this.age = age;
@Override
public String toString()
return "Student" +
"name='" + name + '\\'' +
", age=" + age +
'';
@Override
public int compareTo(Student o)
return this.age - o.age;
package com.lanson.test07;
import java.util.concurrent.PriorityBlockingQueue;
/**
* @author : Lansonli
*/
public class Test02
//这是main方法,程序的入口
public static void main(String[] args)
PriorityBlockingQueue<Student> pq = new PriorityBlockingQueue<>();
pq.put(new Student("nana",18));
pq.put(new Student("lulu",11));
pq.put(new Student("feifei",6));
pq.put(new Student("mingming",21));
System.out.println(pq);
结果:
发现结果并没有按照优先级顺序排列
3、取出数据
package com.lanson.test07;
import java.util.concurrent.PriorityBlockingQueue;
/**
* @author : Lansonli
*/
public class Test02
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException
PriorityBlockingQueue<Student> pq = new PriorityBlockingQueue<>();
pq.put(new Student("nana",18));
pq.put(new Student("lulu",11));
pq.put(new Student("feifei",6));
pq.put(new Student("mingming",21));
System.out.println("------------------------------------------");
System.out.println(pq.take());
System.out.println(pq.take());
System.out.println(pq.take());
System.out.println(pq.take());
从结果证明,这个优先级队列,并不是在put数据的时候计算谁在前谁在后
而是取数据的时候,才真正判断谁在前谁在后
优先级:取数据的优先级
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
以上是关于大数据必学Java基础(六十六):BlockingQueue常见子类的主要内容,如果未能解决你的问题,请参考以下文章
大数据必学Java基础(六十七):DelayQueue深入了解