java多线程之自定义消息队列
Posted wen-pan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java多线程之自定义消息队列相关的知识,希望对你有一定的参考价值。
说明
这里将自己定义实现一个消息队列,该消息队列可以手动配置队列容量,可以供生产者生产消息,可以供消费者消费消息。
1、自定义消息队列
消息类
@Data
public class Message {
// 消息ID
private final int id;
// 消息内容
private final Object message;
public Message(final int id, final Object message) {
this.id = id;
this.message = message;
}
}
消息队列类
根据面向对象的思想,消息队列需要提供put消息和take消息的方法。
@Slf4j
public class MessageQuene {
/**
* 双向队列
*/
private final LinkedList<Message> queue = new LinkedList<>();
/**
* 消息队列的容量,默认容量110
*/
private int capacity = 10;
public MessageQuene(final int capacity) {
this.capacity = capacity;
}
/**
* 向消息队列put消息
*/
public void put(final Message message) throws InterruptedException {
synchronized (this.queue) {
// 超出容量,则在这里等待消费者取走消息
while (this.queue.size() >= this.capacity) {
log.info("消息队列已满,等待消费者消费消息.....");
this.queue.wait();
}
// 从尾部加,从头部取
this.queue.addLast(message);
this.queue.notifyAll();
}
}
/**
* 从消息队列获取消息
*/
public Message take() throws InterruptedException {
synchronized (this.queue) {
while (this.queue.isEmpty()) {
log.info("消息队列已空,等待生产者生产消息.....");
// 这里将中断异常抛出
this.queue.wait();
}
// 从尾部加,从头部取,取出后就从队列删除
final Message message = this.queue.pollFirst();
this.queue.notifyAll();
return message;
}
}
}
2、自定义消息队列测试
@Slf4j
public class MessageQueneTest {
static final MessageQuene messageQuene = new MessageQuene(2);
public static void main(final String[] args) {
// 消费者不断从队列中消费消息
final Thread t1 = new Thread(MessageQueneTest::consumeMessage, "t1");
final Thread t2 = new Thread(MessageQueneTest::consumeMessage, "t2");
// 生产者,不断向队列中生产消息
final Thread t3 = new Thread(MessageQueneTest::putMessage, "t3");
t1.start();
t2.start();
t3.start();
}
public static void consumeMessage() {
try {
while (true) {
final Message message = messageQuene.take();
log.info("消费者消费到消息,id = {},message = {}", message.getId(), message.getMessage());
}
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
public static void putMessage() {
try {
while (true) {
final String message = UUID.randomUUID().toString();
final int id = RandomUtils.nextInt();
log.info("生产者生产消息,id = {},message = {}", id, message);
messageQuene.put(new Message(id, message));
TimeUnit.SECONDS.sleep(1);
}
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
}
测试结果
11:27:58.787 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:58.794 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:58.802 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc
11:27:58.805 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQueneTest - 消费者消费到消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc
11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:59.809 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 773935934,message = 0fe599b8-bf43-41e7-9158-96a6d12b8a25
以上是关于java多线程之自定义消息队列的主要内容,如果未能解决你的问题,请参考以下文章
TWEN-ASR ONE 语音识别系列教程---多线程与消息队列使用