如何保证消息队列里的数据顺序执行?
Posted Hollis Chuang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何保证消息队列里的数据顺序执行?相关的知识,希望对你有一定的参考价值。
使用MQ的时候,经常会有按顺序消费的需求,比如大数据团队为了做数据分析,会把数据库里数据同步到其他系统做一些数据统计分析。同步mysql的时候,为了保证数据同步的实时性,会在中间加一个MQ,多个线程来消费MQ里的数据。
这种同步一般是读取binlog数据,你在MySQL里增改删了数据,对应出来就是3条增改删binlog日志发送到MQ里面,消费的时候肯定必须要按照增改删的顺序执行。如果你换成删除、修改、增加,就导致数据乱套了。
图1 binlog同步
我们以kafka举例,看下哪些环节会出现数据顺序不一致情况,又怎么解决。
假设kafka分配了3个partition,kafka的一个特性就是,能保证写入一个partition中的数据一定是有顺序的。
生产者写的时候,可以指定一个key,比如是订单id作为key,这个id对应的数据一定会写到同一个partition中去,而且这个partition中的数据都是有顺序的。
图2 kafka partition
kafka的消费者开始消费partition中的数据,一个消费消费一个partition,一个partition只能被一个消费者消费,不会出现一个消费者同时消费多个partition的情况。假如现在有3个partition,你启动4个消费者,那么就会有一个消费者消费不到数据。
图3 一个消费者消费一个partition
到目前为止,每个消费者消费到的数据都是有顺序性的。但消费者内部如果是单线程的,效率就会比较低,如果生产者写入kafka的数据量比较大,消费不及时,就会出现消息堆积的情况,所以消费者需要多线程的方式运行。
假如消费者里启动了3个线程,并发的来消费数据,线程之间如果不做同步控制,还是会导致数据乱掉。
图4 消费者多线程消费MQ
那如何保证kafka消费者多线程按顺序消费数据呢?
多个线程不能直接拿数据去处理,此时我们可以在同步系统中搞多个内存队列,消费者拿到数据之后,根据每条数据的key做hash取模,把相同id的数据分配到同一个内存队列中去。
每个内存队列里的数据都是有顺序性的,给每个内存队列都对应一个线程,去消费内存队列中的数据。
假如有3条增改删的数据,都是对同一个id的处理,那么hash取模后就会写入到同一个内存队列里去,由同一个线程去消费,然后按顺序写入数据库中。
如果消费者按照单线程消费处理,一条数据耗费几十毫秒,1秒钟只能处理十几条数据,吞吐量就会非常低。如果开启多线程的方式处理,就会几倍的提高吞吐量,同时也保证了数据的顺序性。
整个流程按这样的设计方案来处理,就可以保证数据的顺序性。
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️
以上是关于如何保证消息队列里的数据顺序执行?的主要内容,如果未能解决你的问题,请参考以下文章
高薪程序员&面试题精讲系列127之如何设计一个消息队列?如何保证消息的执行顺序?