可能是最好的消息队列入门教程

Posted CodingPartner

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了可能是最好的消息队列入门教程相关的知识,希望对你有一定的参考价值。

mq定义:通俗的说,就是一个容器,你把消息丢进去,不需要立即处理。然后有个程序去从你的容器里面把消息一条条读出来处理。消息队列可以是RocketMQ、RabbitMQ,Kafka之类的,也可以是数据库的一张任务表。


1. 什么时候使用mq?

1.1 使用场景

1)数据驱动的任务依赖
比如有3个task,后2个task需要依赖前面的执行结果,可以用mq,第2个task订阅第1个task已完成的消息,以此类推。

2)上游不关心执行结果 

比如火车票出票成功后,需要记录出票报表,给用户发短信通知等这些操作不影响主流程,完全可以交给mq来异步执行。还有像用户注册的场景等。

3)上游关心执行结果,但执行时间很长。

像微信支付,需要跨公网调用微信的接口,执行时间可能会比较长,但调用方又非常关注执行结果。 一般都会采用同步返回成功,异步回调实际结果的方式。 

1.2 mq优缺点

优点
1)上游执行时间短。
2)上下游解耦,模块之间都不相互依赖。

另外,新增一个下游消息关注方,上游不需要修改任何代码。 像订单状态,我们只需要把每个订单的状态写队列,其他业务方谁需要关注就去mq订阅就好了。 

3)流量缓冲,减轻系统压力。
4)可恢复性。系统的一部分组件失效时,不会影响到整个系统。即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

缺点
1)系统更复杂,多了一个MQ组件。
2)消息传递路径更长,延时会增加。

3)消息可靠性和重复性互为矛盾,消息不丢不重难以同时保证。
4)上游无法知道下游的执行结果,这一点是很致命的。也就是说,如果上游需要实时关注执行结果,mq是不适用的。


2. 使用mq需要关注的问题

2.1 消息可达性

关键两点:

  1. 消息需要落地,也就是持久化。

  2. 消息超时、重传、确认保证消息必达。

mq server收到client sender(简称cs)的消息后,需要回调cs消息已接收,否则cs需要不断重试。如果N次还失败,按失败处理;mq server发送消息(push or pull)给client receiver(简称cr)之后,如果mq server没收到cr的ack,也需要不断重试,一般采用指数退避的策略,先隔x秒重发,2x秒重发,4x秒重发,以此类推。 


2.2 消息幂等性

1)如何解决消息的重复发送?(参照2.1图)
3失败:client sender需要生成全局唯一的msg_id,1的重复发送由mq server做去重保证;
5失败:client sender发送的消息需要带入biz_id,服务消费端(也就是业务接收方)去重保证幂等。

2)业务方去重方法 

方法一
执行状态判断 + redis incr,执行成功写db。
比如说,要实现1分钟内同一消息只处理一次,可以在入口判断:
incr {msg_key}:2017-10-25_15:01,返回1才处理。


方法二
我们业务中通过写表加数据库锁来实现,消息有新建、执行中、完成、失败4种状态。  
如果表中没有消息记录,插入;  
如果已完成,返回成功;  
如果是新建或失败,先加锁,然后改成执行中,重试次数加1,调具体业务方法,返回成功失败的状态,最后释放锁;  
如果是进行中,判断消息是否被锁中,是的话接着:是否超时,未超时的话,返回false,表示执行中。如果超时需要释放锁;  
如果进行中且消息未被加锁,执行按失败处理,重新执行。

2.3 消息延时性

引入:如何实现在一段时间之后,完成一个工作任务?
比如,订单完成之后,如果用户在48小时内没有评价,就自动评价为5星。

1)传统实现方法
启一个定时任务,每隔一定时间扫描订单一次,将已完成的48小时之外的未评价订单评为5星。

2)如何避免频繁扫库又确保实时性?
采用环形队列(本质是个数组)。

3)环形队列设计思路

Current Index:  全局索引位置

Cycle Num:当Current Index第几圈扫描到这个Slot时,执行任务。

Task Set:任务集合,环上每1个Slot拥有1个Set。

可以将环形队列划分成3600个格子,每1格表示1秒,从1开始,然后计算Task属于哪1个Slot。如果希望3610秒之后,触发1个延时消息任务,那算下来对应到1圈后的第11个槽。 然后启动一个timer,Current Index不断移动,每秒移动1格(控制移动频率可以提高精度),同时,遍历当前Slot上的任务集合,每个Task的Cycle-Num如果为0,开始执行(发消息,可以用单独的线程来执行Task),并从任务集合中删除。如果不为0,Cycle-Num减1。

4) 注意

开源的MQ好像都不支持延迟消息,可以自己实现一个简易的“延时消息队列”,能解决很多业务问题,并减少很多低效扫库的cron任务。在定时任务框架中,也可以采取这种方式来进行调度的优化。

Netty中的HashedWheelTimer就是类似的原理,可以参照使用。

2.4 消息顺序性

1)顺序消费的问题
如果顺序消费,可能出现某个消息一直消费不成功,从而导致mq server中的队列堵住。  另一方面,并行度就会成为消息系统的瓶颈(吞吐量不够)。

2)如何确保顺序性
要顺序消费的消息需要指定一个唯一id,确保最终分配到同一个队列,这样可以按消息到达的先后顺序来消费消息。在RocketMQ中,需要复写MessageQueueSelector的select方法来指定分配策略。

2.5 消息削峰填谷

1)使用MQ依然存在的问题
下游(消息的订阅者)无法控制到达自己的流量,如果发送方不限速,很有可能把下游压垮。

2)如何做流量缓冲?
由push模式改成pull模式。
client receiver根据自己的处理能力,每隔一定时间,或者每次拉取若干条消息,实施流控,达到保护自身的效果。另外,下游消息接收方最好进行优化,提升整体吞吐量,以便预防消息堆积。

3. rocketmq浅析

3.1 网络架构

3.2 核心概念

消息发送到指定的topic,topic对应多个队列,队列均匀分布到不同的broker中;消息消费的时候,多个消费者共同消费指定topic的消息,一个队列只会被一个消费者消费。

注:下面的内容基于rocketmq 3.2.6。

3.2.1 nameserver

主要负责维护topic路由配置信息,producer和consumer都通过nameserver来决定消息最终发送和消费的broker。
相对来说,nameserver的稳定性非常高。原因有二:
1)nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。
2)nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。

3.2.2 producer

1)与nameserver关系
单个生产者者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。
默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。该时间由DefaultMQProducer的pollNameServerInteval参数决定,可手动配置。

2)与broker关系
单个生产者和该生产者关联的所有broker保持长连接。
默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。
如果Producer发送消息失败,会自动重试,重试的策略: 

重试次数 < retryTimesWhenSendFailed(可配置)

总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)

同时满足上面两个条件后,Producer会选择另外一个队列发送消息

3)负载均衡
每个生产者都会向topic队列轮流发送消息。 某些应用如果不关注消息是否发送成功,请直接使用sendOneway方法发送消息。

3.2.3 broker

1)与nameserver关系
单个broker和所有nameserver保持长连接。
心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。
心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。一旦连接断开,nameserver会立即感知,更新topc与队列的对应关系,但不会通知生产者和消费者。

2)负载均衡
一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系。 如果某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力。 topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。

3)可用性
由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以rocketmq提供了master/slave的结构,slave定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决。

这里有两个关键点:
一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络连接机制,默认情况下,最多需要60秒(待考证),但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。
消费者得到master宕机通知后,转向slave消费,但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉(或者这里也可以同步写slave,但影响性能)。

4)可靠性
所有发往broker的消息,有同步刷盘和异步刷盘机制,总的来说,可靠性非常高。
同步刷盘时,消息写入物理文件才会返回成功,因此非常可靠。
异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电。

5)消息清理
扫描间隔:默认10秒,由broker配置参数cleanResourceInterval决定。
空间阈值:物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%。
清理时机:默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值。
文件保留时长:默认72小时,由broker配置参数fileReservedTime决定。

3.2.4 consumer

1)与nameserver关系
单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。
默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。

2)与broker关系
单个消费者和该消费者关联的所有broker保持长连接。
默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。

3)负载均衡
集群消费模式下,一个消费者集群多台机器共同消费一个topic的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
RebalanceService线程每隔10秒钟会做一次基于topic下的所有队列负载:
遍历每个Consumer订阅的所有topic,然后获取每个topic下的所有队列,再根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等。

4)消费机制
本地队列
消费者不间断的从broker拉取消息(长轮询),消息拉取到本地队列,然后本地消费线程消费本地消息队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由DefaultMQPushConsumer的pullThresholdForQueue属性控制,默认1000,可手动设置。

轮询间隔
消息拉取线程每隔多久拉取一次?间隔时间由DefaultMQPushConsumer的pullInterval属性控制,默认为0,可手动设置。
注:轮询间隔越短,实时性越好;间隔调长,利于流量缓冲,消息高峰期可以适当调大间隔。

消息消费数量
每次接受本地队列的消息是多少条?这个参数由DefaultMQPushConsumer的consumeMessageBatchMaxSize属性控制,默认为1,可手动设置。

MQPullConsumer和MQPushConsumer的区别
其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。
Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

区别是:

a. push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

b. pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

5)消费进度存储
每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置。


4. 参考文章


阿里 RocketMQ 安装与简介 
RocketMQ学习(五):Pull和Push 
分布式开放消息系统(RocketMQ)的原理与实践

以上是关于可能是最好的消息队列入门教程的主要内容,如果未能解决你的问题,请参考以下文章

我说这是新手入门最好的RabbitMQ学习笔记,谁赞成谁反对?

我说这是新手入门最好的RabbitMQ学习笔记,谁赞成谁反对?

消息队列RabbitMQ使用教程收集

rabbitmq - 不会获取队列中的所有消息

Android UI 线程消息队列调度顺序

[视频教程] 基于redis的消息队列实现与思考