MQ消息队列(Message Queue)
Posted 嗨小叔的程序猿之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ消息队列(Message Queue)相关的知识,希望对你有一定的参考价值。
官方定义:
MQ消息队列( Message Queue):是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
文字性的定义写了一大堆,那么具体是什么呢?
前段时间的项目有用到IBM的MQ消息队列,应用场景大致是这样子的,上游系统不断发送报文到MQ容器,而我们主要做的是从MQ容器中将报文取出,进行解析,数据加工,再将加工后的结果数据存入数据库及返回给MQ容器,上游系统再到MQ容器中去取返回的消息。
大致处理流程如下图:
消息队列就是保存消息的容器
可以随时去消息队列容器中取数据,不用讲究实时性。
与传统RPC区别就是消息异步分发,非实时。
生产者:负责消息的产生和发送到MQ容器
消费者:负责从MQ获取消息,并进行相应处理,不关心有多少个消费者。
MQ容器:负责消息存储、确认、重试等,一般包含多个队列。
特性:
异步性:将耗时的同步操作,通过以发送消息的方式,进行了异步化处理。减少了同步等待的时间
松耦合:减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。
分布式:通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性(当然消息队列本身也可以做成分布式集群)。
可靠性:消息队列一般会把接收到的消息存储到本地硬盘上(当消息被处理完之后,存储信息根据不同的消息队列实现,有可能将其删除),这样即使应用挂掉或者消息队列本身挂掉,消息也能够重新加载。
应用场景:
物联网:例如智能家居,通过MTQQ这个协议,再加上IBM的WebSphere MQ 消息队列管理器,可以轻松的实现物联网管理。
推送/聊天应用系统:消息队列+通信协议,消息队列可以实现消息的订阅,以及消息的分发,通过消息队列可以进行点对点推送,一对多广播式推送。
日志监控:消息队列会使得我们处理日志的异步记录等等得到很好的解决
流量削峰:一般在秒杀、团抢活动、短信群发等场景使用,秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉,加入消息队列,假如消息队列长度超过最大数量,则直接抛弃用户请求告知用户系统忙,稍后重试;秒杀业务根据消息队列中的请求信息,再做后续处理。
异步处理:也就是刚刚开头嗨小叔说的项目中的那种情况,不讲究时效性,异步处理信息。
流行的消息中间件与通信协议:
AMQP:http://www.amqp.org/node/
Apache Qpid(Apache开发的,是AMQP的实现):http://qpid.apache.org/
ActiveMQ:http://activemq.apache.org
ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景
RabbitMQ(AMQP协议的实现):http://www.rabbitmq.com/
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
RocketMQ(阿里巴巴):https://github.com/alibaba/rocketmq?spm=5176.doc29532.2.1.I7qrLz
STOMP Protocol Specification:http://stomp.github.com/stomp-specification-1.2.html
HornetQ(JBoss的MQ):http://www.jboss.org/hornetq
Open JMS:http://openjms.sourceforge.net/
ZeroMQ:http://www.zeromq.org/
ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。
Kafka(不成熟的MQ,可看作是日志处理系统):http://kafka.apache.org/
WebSphere MQ(IBM产品):https://www.ibm.com/developerworks/cn/downloads/ws/wmq/
在之前嗨小叔介绍过Redis(),其实Redis也是支持MQ功能的。(一般处于发布者订阅者模式和生产者消费者模式的场景的消息队列,Redis都能够实现)Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
简单实战:
例子一、Rabbitmq+Spring:
1、生产者配置:
application-mq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="test" password="test" port="8098" />
<rabbit:admin connection-factory="connectionFactory"/>
<!-- queue 队列声明-->
<rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
<!-- durable:是否持久化
exclusive: 仅创建者可以使用的私有队列,断开后自动删除
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
<!-- exchange queue binging key 绑定 -->
<rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
<rabbit:bindings>
<rabbit:binding queue="queue_one" key="queue_one_key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。
rabbit:binding:设置消息queue匹配的key -->
<-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="mq.convert.FastJsonMessageConverter"></bean>
<-- spring template声明-->
<rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>
</beans>
2、fastjson messageconver插件实现
FastJsonMessageConverter .java
public class FastJsonMessageConverter extends AbstractMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);
public static final String DEFAULT_CHARSET = "UTF-8";
private volatile String defaultCharset = DEFAULT_CHARSET;
public FastJsonMessageConverter() {
super();
//init();
}
public void setDefaultCharset(String defaultCharset) {
this.defaultCharset = (defaultCharset != null) ? defaultCharset
: DEFAULT_CHARSET;
}
public Object fromMessage(Message message)
throws MessageConversionException {
return null;
}
public <T> T fromMessage(Message message,T t) {
String json = "";
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) FastJson.fromJson(json, t.getClass());
}
protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes = null;
try {
String jsonString = FastJson.toJson(objectToConvert);
bytes = jsonString.getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(this.defaultCharset);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}
}
3、生产者端调用
MyMqGatway .java
public class MyMqGatway {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend("queue_one_key", obj);
}
}
<!-- convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. -->
4、消费者端配置
application-mq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="test" password="test" port="8098" />
<rabbit:admin connection-factory="connectionFactory"/>
<!-- queue 队列声明-->
<rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
<!-- exchange queue binging key 绑定,根据绑定的值取数据 -->
<rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
<rabbit:bindings>
<rabbit:binding queue="queue_one" key="queue_one_key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
<rabbit:listener queues="queue_one" ref="queueOneLitener"/>
</rabbit:listener-container>
</beans>
<!--queues:监听的队列,多个的话用逗号(,)分隔
ref:监听器-->
5、消费者端调用
QueueOneLitener.java
public class QueueOneLitener implements MessageListener{
@Override
public void onMessage(Message message) {
System.out.println(" data :" + message.getBody());
}
}
MQ消息中间件有很多,就不一一讲解,这边从网上摘抄了一个例子。仅供各位参考使用,由于之前项目IBM的MQ源码拿不出来,所以就不在这边放,如果后面有机会拿回来,再补上IBM的MQ源码部分,供大家学习参考。
以上是关于MQ消息队列(Message Queue)的主要内容,如果未能解决你的问题,请参考以下文章