自定义消息队列

Posted 奔跑在梦想的道路上

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义消息队列相关的知识,希望对你有一定的参考价值。

  传统的网通通信,一般是请求---响应式,以TCP模式为例,在高并发情况下,往往伴随大量的客户端Sokcet请求,服务器要不断处理来自客户端的请求,ServerSocket要不断产生新的子线程去响应客户端的请求,会给服务器带来很大的访问压力。

  在这种情况下,消息队列可谓为我们提供了一种新的思路。队列是数据结构中的一种线性表,队列中存储的元素遵守FIFO(First In First Out,先进先出)的规则,这使得队列中的元素是有序的。我们可以将队列中的元素入队与出队视为生产和消费,此外,再结合发布订阅模式(监听模式),就可以通过队列在不同的线程之间进行通信。

  我在这里为大家提供一个简单的自定义消息队列,谨供参考。

package com.itszt.mq;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 一个消息队列监听器,只要生产者生产出消息并推入队列,就会通知处理器执行消费操作
 */
public class PushBlockQueue extends LinkedBlockingQueue<Object> {
    //多线程执行,采用线程池
    private static ExecutorService es = Executors.newFixedThreadPool(10);
    //单例中的饿汉模式,实例化一个队列单例
    private static PushBlockQueue pbq = new PushBlockQueue();
    //状态标识位
    private boolean flag = false;

    private PushBlockQueue() {
    }

    public static PushBlockQueue getInstance() {
        return pbq;
    }

    /**
     * 队列监听启动
     */
    public void start() {
        if (!this.flag) {
            flag = true;
        } else {
            throw new IllegalArgumentException("队列已启动,不可重复启动!");
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (flag) {
                        //从队列中取消息
                        Object obj = take();
                        //线程池派出线程来消费取出的消息
                        es.execute(new PushBlockQueueHandler(obj));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    /**
     * 停止队列监听
     */
    public void stop(){
        this.flag=false;
    }
}
----------------------------------------------------
package com.itszt.mq;

/**
 *相当于队列消息的消费者
 */
public class PushBlockQueueHandler implements Runnable{
    //消费的对象
    private Object obj;

    public PushBlockQueueHandler(Object obj){
        this.obj=obj;
    }
    //消费线程
    @Override
    public void run() {
        doBusiness();
    }
    //消费行为
    private void doBusiness() {
        System.out.println(Thread.currentThread().getName()+"-收到消息:"+obj);
    }
}
-----------------------------------------------
package com.itszt.mq;

import java.util.Scanner;

/**
 * 自定义消息队列测试类
 */
public class MQTest {
    public static void main(String[] args) {
        //获取消息队列的单例,并启动队列监听器
        PushBlockQueue.getInstance().start();
        //循环向队列写入数据
        /**
         * 生产者----生产消息----》入队列----监听器----通知消费者---》消费
         */
        Scanner sc=new Scanner(System.in);
        try {
            while (true){
                String content = sc.nextLine();
                if(content.trim().equals("stop")){
                    System.exit(1);
                }
                PushBlockQueue.getInstance().put(content);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

   启动程序,你(主线程)就可以在控制台里通过消息队列向其他线程发布消息了!

中国梦,大家的梦!
pool-1-thread-1-收到消息:中国梦,大家的梦!
为中华民族的伟大复兴而不懈奋斗!
pool-1-thread-2-收到消息:为中华民族的伟大复兴而不懈奋斗! 

以上是关于自定义消息队列的主要内容,如果未能解决你的问题,请参考以下文章

java多线程之自定义消息队列

Linux编程之自定义消息队列

Linux编程之自定义消息队列

自定义消息队列

Linux编程之自定义消息队列

创建自定义死信队列