MQ消息队列(Message Queue)

Posted 嗨小叔的程序猿之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ消息队列(Message Queue)相关的知识,希望对你有一定的参考价值。

官方定义:

    MQ消息队列( Message Queue):是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

    文字性的定义写了一大堆,那么具体是什么呢?

    前段时间的项目有用到IBM的MQ消息队列,应用场景大致是这样子的,上游系统不断发送报文到MQ容器,而我们主要做的是从MQ容器中将报文取出,进行解析,数据加工,再将加工后的结果数据存入数据库及返回给MQ容器,上游系统再到MQ容器中去取返回的消息。

    大致处理流程如下图:


  • 消息队列就是保存消息的容器

  • 可以随时去消息队列容器中取数据,不用讲究实时性。

  • 与传统RPC区别就是消息异步分发,非实时。

  • 生产者:负责消息的产生和发送到MQ容器

  • 消费者:负责从MQ获取消息,并进行相应处理,不关心有多少个消费者。

  • MQ容器:负责消息存储、确认、重试等,一般包含多个队列。


特性:

  • 异步性:将耗时的同步操作,通过以发送消息的方式,进行了异步化处理。减少了同步等待的时间

  • 松耦合:减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。

  • 分布式:通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性(当然消息队列本身也可以做成分布式集群)。

  • 可靠性:消息队列一般会把接收到的消息存储到本地硬盘上(当消息被处理完之后,存储信息根据不同的消息队列实现,有可能将其删除),这样即使应用挂掉或者消息队列本身挂掉,消息也能够重新加载。


应用场景:

  • 物联网:例如智能家居,通过MTQQ这个协议,再加上IBM的WebSphere MQ 消息队列管理器,可以轻松的实现物联网管理。

  • 推送/聊天应用系统:消息队列+通信协议消息队列可以实现消息的订阅,以及消息的分发,通过消息队列可以进行点对点推送,一对多广播式推送

  • 日志监控:消息队列会使得我们处理日志的异步记录等等得到很好的解决

  • 流量削峰:一般在秒杀、团抢活动、短信群发等场景使用秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉,加入消息队列假如消息队列长度超过最大数量,则直接抛弃用户请求告知用户系统忙,稍后重试;秒杀业务根据消息队列中的请求信息,再做后续处理。

  • 异步处理:也就是刚刚开头嗨小叔说的项目中的那种情况,不讲究时效性,异步处理信息。


流行的消息中间件与通信协议:

  • AMQPhttp://www.amqp.org/node/

  • Apache Qpid(Apache开发的,是AMQP的实现):http://qpid.apache.org/

  • ActiveMQhttp://activemq.apache.org

    ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景

  • RabbitMQ(AMQP协议的实现):http://www.rabbitmq.com/

    RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

  • RocketMQ(阿里巴巴):https://github.com/alibaba/rocketmq?spm=5176.doc29532.2.1.I7qrLz

  • STOMP Protocol Specification:http://stomp.github.com/stomp-specification-1.2.html

  • HornetQ(JBoss的MQ):http://www.jboss.org/hornetq

  • Open JMS:http://openjms.sourceforge.net/

  • ZeroMQhttp://www.zeromq.org/

    ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)

  • Kafka(不成熟的MQ,可看作是日志处理系统)http://kafka.apache.org/

  • WebSphere MQ(IBM产品):https://www.ibm.com/developerworks/cn/downloads/ws/wmq/

    在之前嗨小叔介绍过Redis(),其实Redis也是支持MQ功能的。(一般处于发布者订阅者模式和生产者消费者模式的场景的消息队列,Redis都能够实现)Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。


简单实战:

例子一、Rabbitmq+Spring:

1、生产者配置:

application-mq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

   <!-- 连接服务配置  -->
   <rabbit:connection-factory id="connectionFactory" host="localhost" username="test"     password="test" port="8098"  />
        
   <rabbit:admin connection-factory="connectionFactory"/>
   
   <!-- queue 队列声明-->
   <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
   <!-- durable:是否持久化
exclusive: 仅创建者可以使用的私有队列,断开后自动删除
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列  -->

<!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

<!-- rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发
rabbit:binding:设置消息queue匹配的key     -->

    <-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
    <bean id="jsonMessageConverter"  class="mq.convert.FastJsonMessageConverter"></bean>
    
    <-- spring template声明-->
    <rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>
</beans>

2、fastjson messageconver插件实现

FastJsonMessageConverter .java

public class FastJsonMessageConverter  extends AbstractMessageConverter {
    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);
    public static final String DEFAULT_CHARSET = "UTF-8";
    private volatile String defaultCharset = DEFAULT_CHARSET;
    public FastJsonMessageConverter() {
        super();
        //init();
    }
    public void setDefaultCharset(String defaultCharset) {
        this.defaultCharset = (defaultCharset != null) ? defaultCharset
                : DEFAULT_CHARSET;
    }
    public Object fromMessage(Message message)
            throws MessageConversionException {
        return null;
    }
    public <T> T fromMessage(Message message,T t) {
        String json = "";
        try {
            json = new String(message.getBody(),defaultCharset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return (T) FastJson.fromJson(json, t.getClass());
    }    
   
    protected Message createMessage(Object objectToConvert,
            MessageProperties messageProperties)
            throws MessageConversionException {
        byte[] bytes = null;
        try {
            String jsonString = FastJson.toJson(objectToConvert);
            bytes = jsonString.getBytes(this.defaultCharset);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.defaultCharset);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);
    }
}

3、生产者端调用

MyMqGatway .java

public class MyMqGatway {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void sendDataToCrQueue(Object obj) {
        amqpTemplate.convertAndSend("queue_one_key", obj);
    }    
}

<!-- convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. -->

4、消费者端配置

application-mq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
   <!-- 连接服务配置  -->
   <rabbit:connection-factory id="connectionFactory" host="localhost" username="test"        password="test" port="8098"  />
          <rabbit:admin connection-factory="connectionFactory"/>
   <!-- queue 队列声明-->
   <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
   
   <!-- exchange queue binging key 绑定,根据绑定的值取数据 -->
    <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="queue_one" ref="queueOneLitener"/>
    </rabbit:listener-container>
</beans>

  <!--queues:监听的队列,多个的话用逗号(,)分隔
ref:监听器-->

5、消费者端调用

QueueOneLitener.java

public class QueueOneLitener implements  MessageListener{
    @Override
    public void onMessage(Message message) {
        System.out.println(" data :" + message.getBody());
    }
}


    MQ消息中间件有很多,就不一一讲解,这边从网上摘抄了一个例子。仅供各位参考使用,由于之前项目IBM的MQ源码拿不出来,所以就不在这边放,如果后面有机会拿回来,再补上IBM的MQ源码部分,供大家学习参考。



以上是关于MQ消息队列(Message Queue)的主要内容,如果未能解决你的问题,请参考以下文章

消息队列MQ

带你整理面试过程中关于消息队列MQ的相关知识

2020-12-25:MQ中,如何保证消息的顺序性?

深入了解ActiveMQ!

MQ消息队列及常见MQ比较

项目设计中MQ(message queue)使用总结