消息队列MQ
Posted 程序员的猫
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列MQ相关的知识,希望对你有一定的参考价值。
MQ(message/queue)是在消息的传输过程中保存消息的容器。
为什么用MQ??
主要是为了解决异步消息,应用解耦,流量削峰等问题
异步消息:我只用两张经典的图做说明,不做过多解释
从响应过程上看很明显的体现出消息队列的作用了。
应用解耦:在一个电商系统中,每产生新一个订单需要调用库存系统的接口,改变库存数量。当库存系统出现异常时就会影响订单的产生。所以订单系统和库存系统之间是存在很强的耦合。引入消息队列之后,成功给两个系统解耦。
流量削峰:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。控制队列中请求数量,当超过这个请求数量时,直接返回秒杀结束。系统只需处理消息队列中存在的请求就可以。
本文介绍的是Apache的ActiveMQ
启动服务:进入bin目录下运行activemq.bat文件
启动成功在浏览器中输入:http://localhost:8161/admin/ 默认账号:admin 密码:admin 见到如下界面说明启动成功
为了看懂下面的示例,首先接收两个知识点:
JMS消息模型(java message servers)
p2p(点对点)
角色:队列(queue)、发送者、接收者
特点:一个消息只能被一个接收者所消费,发送者和接收者在时间上没有依赖(收短信),接收到消息后会给队列一个应答
Pub/Sub(发布订阅)
角色:主题(topic)、发布者、订阅者
特点:每个消息可以有多个消费者,发布者和订阅者有严格时间依赖(听广播)(但是可以做到消息持久化订阅)
ActiveMQ Topic 消息重发
JMS消息确定机制有以下4种
AUTO_ACKNOWLEDGE = 1 自动确定
CLIENT_ACKNOWLEDGE=2 客户端手动确认 message.acknowledge()应答表示确认 sesssion.recover()表示处理过程出现异常,需要重发消息
DUPS_ACKNOWLEDGE = 3 自动批量确认
SESSION_TRANSACTED = 0 事物提交并确认
他们都在session接口中定义成了常量
代码实战:
1、创建maven工程
2、添加mq的jar包和Junit的jar包
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
重要的5个对象
Connection、Session、MessageProducer/MessageConsumer、Destination、Message
Connection :JMS 客户端到JMS 供应者的连接
Session: 一个发送或接收消息的线程(会话)
MessageProducer/MessageConsumer:消息发送者/接受者
Destination:消息源;消息发送给谁/从哪里接收(topic、queue)
Message:消息内容(有5种类型)
1. TextMessage:java.lang.String对象,如xml文件内容。
2. MapMessage:key/value键值对的集合,key是String对象,值类型可以是Java任何基本类型。
3. BytesMessage:字节流。
4. StreamMessage:Java 中的输入输出流。
5. ObjectMessage:Java中的可序列化对象。
另外,还有一种是Message,没有消息体,只有消息头和属性。
3、撸起袖子干
消息发布整个流程
消息接收整个流程
对上面的代码做几点总结:
一:发布的时候需要关闭资源,接收的时候不需要关闭,程序要一直监听message。
二:上述示例中是采用Topic类型的destination,如果是queue类型则创建时有所不同。这两种类型正是对应着JMS消息模型中的两种方式。
三:上述示例中没有体现重发机制,当出现异常时调用sesssion.recover()方法,MQ将会重发消息到订阅者,默认重发6次。
四:总结一下发布流程:
创建ConnectionFactory工厂
创建Connection
启用Connection
创建Session
通过session创建producer发送消息,创建时需要传入一个Destination(这个对象是用来确定用那种JMS模型来接收消息)
创建消息
发送消息
关闭资源
对于消息接收来说
5.创建consumer
6.通过consumer监听消息
7.处理消息
8.向MQ响应
以上是关于消息队列MQ的主要内容,如果未能解决你的问题,请参考以下文章