java多线程之自定义消息队列

Posted wen-pan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java多线程之自定义消息队列相关的知识,希望对你有一定的参考价值。

说明

这里将自己定义实现一个消息队列,该消息队列可以手动配置队列容量,可以供生产者生产消息,可以供消费者消费消息。

1、自定义消息队列

消息类

@Data
public class Message {
    // 消息ID
    private final int id;
    // 消息内容
    private final Object message;

    public Message(final int id, final Object message) {
        this.id = id;
        this.message = message;
    }
}

消息队列类

根据面向对象的思想,消息队列需要提供put消息和take消息的方法。

@Slf4j
public class MessageQuene {
    /**
     * 双向队列
     */
    private final LinkedList<Message> queue = new LinkedList<>();
    /**
     * 消息队列的容量,默认容量110
     */
    private int capacity = 10;

    public MessageQuene(final int capacity) {
        this.capacity = capacity;
    }

    /**
     * 向消息队列put消息
     */
    public void put(final Message message) throws InterruptedException {
        synchronized (this.queue) {
            // 超出容量,则在这里等待消费者取走消息
            while (this.queue.size() >= this.capacity) {
                log.info("消息队列已满,等待消费者消费消息.....");
                this.queue.wait();
            }
            // 从尾部加,从头部取
            this.queue.addLast(message);
            this.queue.notifyAll();
        }
    }

    /**
     * 从消息队列获取消息
     */
    public Message take() throws InterruptedException {
        synchronized (this.queue) {
            while (this.queue.isEmpty()) {
                log.info("消息队列已空,等待生产者生产消息.....");
                // 这里将中断异常抛出
                this.queue.wait();
            }
            // 从尾部加,从头部取,取出后就从队列删除
            final Message message = this.queue.pollFirst();
            this.queue.notifyAll();
            return message;
        }
    }
}

2、自定义消息队列测试

@Slf4j
public class MessageQueneTest {

    static final MessageQuene messageQuene = new MessageQuene(2);

    public static void main(final String[] args) {
        // 消费者不断从队列中消费消息
        final Thread t1 = new Thread(MessageQueneTest::consumeMessage, "t1");
        final Thread t2 = new Thread(MessageQueneTest::consumeMessage, "t2");
        // 生产者,不断向队列中生产消息
        final Thread t3 = new Thread(MessageQueneTest::putMessage, "t3");

        t1.start();
        t2.start();
        t3.start();
    }

    public static void consumeMessage() {
        try {
            while (true) {
                final Message message = messageQuene.take();
                log.info("消费者消费到消息,id = {},message = {}", message.getId(), message.getMessage());
            }
        } catch (final InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void putMessage() {
        try {
            while (true) {
                final String message = UUID.randomUUID().toString();
                final int id = RandomUtils.nextInt();
                log.info("生产者生产消息,id = {},message = {}", id, message);
                messageQuene.put(new Message(id, message));
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (final InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试结果

11:27:58.787 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:58.794 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:58.802 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc
11:27:58.805 [t1] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQueneTest - 消费者消费到消息,id = 872383243,message = 0b06d297-a59a-4a47-9f78-ddd36eb332fc
11:27:58.805 [t2] INFO com.wp.juc.hm.test.MessageQuene - 消息队列已空,等待生产者生产消息.....
11:27:59.809 [t3] INFO com.wp.juc.hm.test.MessageQueneTest - 生产者生产消息,id = 773935934,message = 0fe599b8-bf43-41e7-9158-96a6d12b8a25

以上是关于java多线程之自定义消息队列的主要内容,如果未能解决你的问题,请参考以下文章

Linux编程之自定义消息队列

Linux编程之自定义消息队列

多线程之模拟自定义消息队列

TWEN-ASR ONE 语音识别系列教程---多线程与消息队列使用

TWEN-ASR ONE 语音识别系列教程---多线程与消息队列使用

TWEN-ASR ONE 语音识别系列教程---多线程与消息队列使用