如何保证消息队列里的数据顺序执行?

Posted Hollis Chuang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何保证消息队列里的数据顺序执行?相关的知识,希望对你有一定的参考价值。

使用MQ的时候,经常会有按顺序消费的需求,比如大数据团队为了做数据分析,会把数据库里数据同步到其他系统做一些数据统计分析。同步mysql的时候,为了保证数据同步的实时性,会在中间加一个MQ,多个线程来消费MQ里的数据。

这种同步一般是读取binlog数据,你在MySQL里增改删了数据,对应出来就是3条增改删binlog日志发送到MQ里面,消费的时候肯定必须要按照增改删的顺序执行。如果你换成删除、修改、增加,就导致数据乱套了。

图1 binlog同步

我们以kafka举例,看下哪些环节会出现数据顺序不一致情况,又怎么解决。

假设kafka分配了3个partition,kafka的一个特性就是,能保证写入一个partition中的数据一定是有顺序的。

生产者写的时候,可以指定一个key,比如是订单id作为key,这个id对应的数据一定会写到同一个partition中去,而且这个partition中的数据都是有顺序的。

图2 kafka partition

kafka的消费者开始消费partition中的数据,一个消费消费一个partition,一个partition只能被一个消费者消费,不会出现一个消费者同时消费多个partition的情况。假如现在有3个partition,你启动4个消费者,那么就会有一个消费者消费不到数据。

图3 一个消费者消费一个partition

到目前为止,每个消费者消费到的数据都是有顺序性的。但消费者内部如果是单线程的,效率就会比较低,如果生产者写入kafka的数据量比较大,消费不及时,就会出现消息堆积的情况,所以消费者需要多线程的方式运行。

假如消费者里启动了3个线程,并发的来消费数据,线程之间如果不做同步控制,还是会导致数据乱掉。

图4 消费者多线程消费MQ

那如何保证kafka消费者多线程按顺序消费数据呢?

多个线程不能直接拿数据去处理,此时我们可以在同步系统中搞多个内存队列,消费者拿到数据之后,根据每条数据的key做hash取模,把相同id的数据分配到同一个内存队列中去。

每个内存队列里的数据都是有顺序性的,给每个内存队列都对应一个线程,去消费内存队列中的数据。

假如有3条增改删的数据,都是对同一个id的处理,那么hash取模后就会写入到同一个内存队列里去,由同一个线程去消费,然后按顺序写入数据库中。

如果消费者按照单线程消费处理,一条数据耗费几十毫秒,1秒钟只能处理十几条数据,吞吐量就会非常低。如果开启多线程的方式处理,就会几倍的提高吞吐量,同时也保证了数据的顺序性。

整个流程按这样的设计方案来处理,就可以保证数据的顺序性。

有道无术,术可成;有术无道,止于术

欢迎大家关注Java之道公众号

好文章,我在看❤️

以上是关于如何保证消息队列里的数据顺序执行?的主要内容,如果未能解决你的问题,请参考以下文章

高薪程序员&面试题精讲系列127之如何设计一个消息队列?如何保证消息的执行顺序?

RabbitMQ 如何保证消息顺序 --- 2022-04-03

消息队列中,如何保证消息的顺序性?

mq中如何保证消费者顺序消费

RocketMq如何保证消费顺序

2020-12-25:MQ中,如何保证消息的顺序性?