消息队列MQ

Posted 程序员的猫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列MQ相关的知识,希望对你有一定的参考价值。

MQ(message/queue)是在消息的传输过程中保存消息的容器。

为什么用MQ??

主要是为了解决异步消息,应用解耦,流量削峰等问题


异步消息:我只用两张经典的图做说明,不做过多解释

消息队列MQ

从响应过程上看很明显的体现出消息队列的作用了。


应用解耦:在一个电商系统中,每产生新一个订单需要调用库存系统的接口,改变库存数量。当库存系统出现异常时就会影响订单的产生。所以订单系统和库存系统之间是存在很强的耦合。引入消息队列之后,成功给两个系统解耦。

消息队列MQ

流量削峰:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。控制队列中请求数量,当超过这个请求数量时,直接返回秒杀结束。系统只需处理消息队列中存在的请求就可以。


本文介绍的是Apache的ActiveMQ



启动服务:进入bin目录下运行activemq.bat文件

消息队列MQ

启动成功在浏览器中输入:http://localhost:8161/admin/ 默认账号:admin 密码:admin 见到如下界面说明启动成功


消息队列MQ


为了看懂下面的示例,首先接收两个知识点:


JMS消息模型(java message servers)

p2p(点对点)

角色:队列(queue)、发送者、接收者

特点:一个消息只能被一个接收者所消费,发送者和接收者在时间上没有依赖(收短信),接收到消息后会给队列一个应答


Pub/Sub(发布订阅)

角色:主题(topic)、发布者、订阅者

特点:每个消息可以有多个消费者,发布者和订阅者有严格时间依赖(听广播)(但是可以做到消息持久化订阅)


ActiveMQ Topic 消息重发


JMS消息确定机制有以下4种

AUTO_ACKNOWLEDGE = 1 自动确定

CLIENT_ACKNOWLEDGE=2 客户端手动确认 message.acknowledge()应答表示确认 sesssion.recover()表示处理过程出现异常,需要重发消息

DUPS_ACKNOWLEDGE = 3  自动批量确认

SESSION_TRANSACTED = 0 事物提交并确认

他们都在session接口中定义成了常量


代码实战:

 1、创建maven工程


消息队列MQ

2、添加mq的jar包和Junit的jar包

<dependencies>

     <dependency>

         <groupId>org.apache.activemq</groupId>

         <artifactId>activemq-client</artifactId>

         <version>5.14.5</version>

     </dependency>   

     <dependency>

         <groupId>junit</groupId>

         <artifactId>junit</artifactId>

         <version>4.12</version>

         <scope>test</scope>

     </dependency>   

  </dependencies>

重要的5个对象

Connection、Session、MessageProducer/MessageConsumerDestination、Message

Connection :JMS 客户端到JMS 供应者的连接

Session: 一个发送或接收消息的线程(会话)

MessageProducer/MessageConsumer:消息发送者/接受者

Destination:消息源;消息发送给谁/从哪里接收(topic、queue

Message:消息内容(有5种类型)

1. TextMessage:java.lang.String对象,如xml文件内容。

2. MapMessage:key/value键值对的集合,key是String对象,值类型可以是Java任何基本类型。 

3. BytesMessage:字节流。

4. StreamMessage:Java 中的输入输出流。

5. ObjectMessage:Java中的可序列化对象。

另外,还有一种是Message,没有消息体,只有消息头和属性。


3、撸起袖子干

消息发布整个流程


消息队列MQ

消息接收整个流程

消息队列MQ

对上面的代码做几点总结:

一:发布的时候需要关闭资源,接收的时候不需要关闭,程序要一直监听message。

二:上述示例中是采用Topic类型的destination,如果是queue类型则创建时有所不同。这两种类型正是对应着JMS消息模型中的两种方式。

三:上述示例中没有体现重发机制,当出现异常时调用sesssion.recover()方法,MQ将会重发消息到订阅者,默认重发6次。

消息队列MQ

四:总结一下发布流程:

  1. 创建ConnectionFactory工厂

  2. 创建Connection

  3. 启用Connection

  4. 创建Session

  5. 通过session创建producer发送消息,创建时需要传入一个Destination(这个对象是用来确定用那种JMS模型来接收消息)

  6. 创建消息

  7. 发送消息

  8. 关闭资源


对于消息接收来说

  5.创建consumer

  6.通过consumer监听消息

  7.处理消息

  8.向MQ响应






以上是关于消息队列MQ的主要内容,如果未能解决你的问题,请参考以下文章

深入消息队列MQ,看这篇就够了!

消息队列(mq)是啥?

消息队列(mq)是啥?

MQ系列——MQ简介

MQ消息队列

消息队列MQ 之 Kafka