37 生产案例:基于RocketMQ进行订单库数据同步的消息乱序问题及解决方案

Posted 鮀城小帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了37 生产案例:基于RocketMQ进行订单库数据同步的消息乱序问题及解决方案相关的知识,希望对你有一定的参考价值。

一、消息乱序问题

1. 消息乱序的场景

前面说过了消息丢失、消息重复、消息处理失败三个问题,接下来是关于消息乱序的问题及解决方案。

场景:消息乱序的发生场景

大数据团队在获取订单数据库中的全部数据后,需要将订单数据保存一份在自己的大数据存储系统中,比如HDFS、Hive、HBase等。

以上的过程在优惠后会基于Canal中间件去监听订单数据库的binlog,也就是通过一些增删改操作的日志,然后把这些binlog发送到MQ里去。

然后大数据系统从MQ里获取binlog,落地到自己的大数据存储中去;再由大数据系统对存储的数据进行计算得到数据报表即可。

 2. binlog消息乱序导致的数据指标错误

数据指标错误

基于Canal中间件同步后的数据生成的报表在与订单数据库中的订单数据作对比的时候,是有可能发生数据对不上的错误问题的。

比如:某个订单的库存量在订单库存数据库里是100,但是在报表上却是0。

binlog消息乱序问题

基于Canal同步中间件的使用过程中,订单数据库的binlog在通过MQ同步的过程中,是会发生消息乱序的现象。

简单来说,在订单系统更新数据库时有两条SQL语句:

insert into order values(xx,0)

update order set xxvalue=100 where id=xxx

订单系统的流程是先 insert插入一条数据,然后再update修改 id=xx对应的那条数据的xxvalue为100。

这两条语句写入binlog时是先载入insert语句,然后再写入update语句。

对应的在大数据系统从MQ获取出来binlog的时候,确实先获取到了 update 语句的binlog,然后才获取到 insert 语句的 binlog 。

也就是说,此时会先执行更新操作,但数据不存在会更新失败;接着执行插入操作。也就是插入一条字段值为0的订单数据进去,最终大数据系统存储的对应id的value就是0了。

也正是因为这个消息乱序的原因,导致了大数据存储中的数据都错乱了。

3. 为什么基于MQ来传输数据会出现消息乱序

说明:发生消息乱序不是绝对的,这涉及到MQ底层的原理。

原理解析:

在MQ系统中,可以给每个Topic指定多个MessageQueue,然后生产者写入消息的时候,就会把消息均匀分发给不同的MessageQueue的。

当写入binlog到MQ的时候,可能会把insert binlog写入到一个MessageQueue里去,update binlog写入到另一个 MessageQueue里去。

 而当大数据系统去获取binlog的时候,在部署多态机器组成一个Consumer Group,对于Consumer Group中的每台机器都会负责消费一部分MessageQueue的消息,所以可能一台机器从ConsumerQueue01中获取到 insert binlog,另一台从ConsumerQueue02中获取到了update binlog。

而导致消息乱序执行的关键就在于,两台机器上的大数据系统并行的去获取 binlog ,就有可能一个大数据系统先获取到了 update binlog 去执行了更新操作,此时存储中没有数据,没法更新。

然后另一个大数据系统再获取到 insert binlog 去执行插入操作,最终导致只有一个字段值为0的订单数据。

 4. 消息乱序总结

消息乱序的问题发生:原本有顺序的消息,因为存在分发到不同的MessageQueue中去,然后不同机器上部署的Consumer可能会用混乱的顺序从不同的MessageQueue里获取消息然后处理。

二、RocketMQ中解决消息乱序问题的方案

1.第一步:让属于同一个订单的binlog进入一个MessageQueue

想要解决消息的乱序问题,最根本的方法就是让一个订单的binlog进入到同一个 MessageQueue里去。

场景分析:

以前面的例子来说,一个订单,先后执行了 insert、update两条SQL语句,也就对应了2个binlog。我们根据订单id来进行判断,在往MQ里发送 binlog 的时候,根据订单 id 来判断一下,如果 id 相同,就必须保证它进入同一个 MessageQueue。

具体的可以采用取模的方法,比如有一个订单id是1100,那么它的两个binlog,我们用订单 id=1100 对MessageQueue的数量进行取模。如:MessageQueue有15个,订单 id=1100对15取模,结果为5。

那么就将订单id=1100的binlog,都进入位置为5的MessageQueue里去。

通过这个方法,就可以保证让一个订单的 binlog 都按照顺序进入到一个 MessageQueue中去。

 2.第二步:有序的去获取binlog

在保证一个订单的binlog都进入一个MessageQueue之后,还不够。

要知道,mysql数据库的 binlog 一定是有顺序的。当我们从MySQL数据库中获取 binlog 的时候,也必须按照 binlog 的顺序来获取。

也就是说,当Canal从MySQL那里监听和获取 binlog ,那么当 binlog 传输到 Canal 的时候,也是有先后顺序的,先是 insert binlog ,然后是 update binlog。

 接着将 binlog 发送给 MQ 的时候,必须将一个订单的 binlog 都发送到一个 MessageQueue里去,而且发送过去的时候,也必须是按照顺序来发送的。

只有这样,最终才能让一个订单的 binlog 进入同一个 MessageQueue 的时候是有序的。

3. 第三步:Consumer有序处理一个订单的binlog

在MQ底层,一个Consumer可以处理多个 MessageQueue的消息,但是一个MessageQueue只能交给一个Consumer来进行处理,所以一个订单的 binlog 只会有序的交给一个 Consumer  来进行处理。

在完成以上三步的方案后,大数据系统就可以获取到一个订单的有序的 binlog ,然后有序的根据 binlog 把数据还原到自己的存储中去。

4.消息处理失败了不能走重试队列

由于Consumer处理消息的时候,可能会因为底层存储挂了导致消息处理失败,之前的逻辑是可以返回RECONSUME_LATER状态。然后 broker 会过一会儿自动给我们重试。

但是现在要解决消息乱序问题,就不能这么做了。在有序消息的方案中,如果遇到消息处理失败的场景,就必须返回SUSPEND_CURRENT_QUEUE_A_MOMENT状态,它的意思是先等一会儿,一会儿再继续处理这批消息,而不能把这批消息放入重试队列去,然后直接处理下一批消息。

5.扩展

(1)如果处理消息一直异常,不会让消费者一直等待下去的,而是重试一定次数后会进入死信队列。

三、RocketMQ顺序消息机制的代码实现案例

1.让一个订单的 binlog 进入一个MessageQueue

首先要实现消息顺序,必须让一个订单的 binlog 都进入一个MessageQueue 中,代码如下:

 在上面的代码片段中,有两个关键点,一个是发送消息的时候传入一个MessageQueueSelector,在里面要根据订单id 和MessageQueue数量去选择这个订单id的数据进入哪个 MessageQueue。

同时在发送消息的时候除了带上消息自己以为,还要带上订单id,然后MessageQueueSelector会根据订单id去选择一个MessageQueue发送过去,这样的话,就可以保证一个订单的多个binlog都会进入一个MessageQueue中去。

2.消费者保证按照顺序来获取一个MessageQueue中的消息的代码实现

消费者按照顺序来获取一个MessageQueue中的消息的逻辑如下:

 在上述的代码中,使用的是 MessageListenerOrderly 这个东西,它里面有 Orderly 这个名称。也就是说,Consumer会对每一个 ConsumerQueue,都仅仅用一个线程来处理其中的消息。

比如对 ConsumeQueue01 中的订单id=1100的多个 binlog ,会交给一个线程来按照 binlog 顺序来一次处理。否则如果 ConsumeQueue01中的订单id=1100 的多个 binlog 叫个 ConsumeQueue 中的多个线程来处理的话,那还是会有消息乱序的问题。

以上是关于37 生产案例:基于RocketMQ进行订单库数据同步的消息乱序问题及解决方案的主要内容,如果未能解决你的问题,请参考以下文章

34 生产案例:从RocketMQ底层原理分析为什么会发生重复发优惠券

浅谈RocketMQ的事务消息

39 生产案例:基于延迟消息机制优化大量订单的定时退款扫描问题示例代码

分布式事务基于RocketMQ搭建生产级消息集群?

38 基于RocketMQ的数据过滤机制,提高订单数据库同步的处理效率

40 RocketMQ的生产实践总结