mq如何保证高可用,解决重复消费、数据丢失问题和顺序性问题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mq如何保证高可用,解决重复消费、数据丢失问题和顺序性问题相关的知识,希望对你有一定的参考价值。

参考技术A

rabbitmq有三种模式: 单机模式,普通集群模式,镜像集群模式

kafka架构:多个broker组成,每个broker是一个节点;创建一个topic,这个topic可以划分为多个partition,每个partition可以存在于不同的broker上,每个partition就放一部分数据。

它是一个分布式消息队列,就是说一个topic的数据,是分散放在多个机器上的,每个机器就放一部分数据。


kafka 0.8以前,是没有HA机制的,就是任何一个broker宕机了,那个broker上的partition就废了,没法写也没法读,没有什么高可用性可言。


kafka 0.8以后,提供了HA机制,就是replica副本机制。每个partition的数据都会同步到吉他机器上,形成自己的多个replica副本。然后所有replica会选举一个leader出来,那么生产和消费都跟这个leader打交道,然后其他replica就是follower。写的时候,leader会负责把数据同步到所有follower上去,读的时候就直接读leader上数据即可。kafka会均匀的将一个partition的所有replica分布在不同的机器上,从而提高容错性。

如果某个broker宕机了也没事,它上面的partition在其他机器上都有副本的,如果这上面有某个partition的leader,那么此时会重新选举一个新的leader出来,大家继续读写那个新的leader即可。这就有所谓的高可用性了。


写数据的时候,生产者就写leader,然后leader将数据落地写本地磁盘,接着其他follower自己主动从leader来pull数据。一旦所有follower同步好数据了,就会发送ack给leader,leader收到所有follower的ack之后,就会返回写成功的消息给生产者。

消费的时候,只会从leader去读,但是只有当消息已经被所有follower都同步成功返回ack的时候,这个消息才会被消费者读到。

kafka重复消费的情况:
kafka有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,下次重启时,从上次消费到的offset来继续消费。但是offset没来得及提交就重启,这部分会再次消费一次。


怎么保证消息队列消费的幂等性:

丢数据,mq一般分为两种,要么是mq自己弄丢了,要么是我们消费的时候弄丢了

拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理

写入一个partition中的数据一定是有序的,生产者在写的时候 ,可以指定一个key,比如指定订单id作为key,这个订单相关数据一定会被分发到一个partition中去。消费者从partition中取出数据的时候也一定是有序的,把每个数据放入对应的一个内存队列,一个partition中有几条相关数据就用几个内存队列,消费者开启多个线程,每个线程处理一个内存队列。

消息队列重复消费和数据丢失问题(石衫面试突击学习笔记)

消息队列重复消费问题


如何保证消息不被重复消费?(如何保证消息消费时的幂等性)

其实这是一个很常见的问题,这两个问题基本上可以连起来问。既然是消费消息,那肯定要考虑会不会重复消费?能不能避免重复消费?或者重复消费也别造成系统异常可以吗?这个是 MQ 领域的基本问题,其实本质上还是问你使用消息队列如何保证幂等性,这个是你架构里面需要考虑的一个问题

 

首先介绍一下什么叫做消息重复消费的问题:

 

kafka实际上有个 offset 的概念,就是每个消息写进去都有一个 offset ,这个代表他的序号,然后 consumer 消费了数据之后,每隔一段时间就会把自己消费过的消息的 offset 提交一下,代表我已经消费过了,下次要是我重启啥的,你就让我继续从上次消费的 offset 来继续消费
但是凡事都有意外,比如我们生产经常遇到的就是你的有时候重启系统,这里面看你的重启方式,要是有时候重启的比较急直接把进程 kill 了,再重启。这会导致 consumer 有些消息处理了但是还没有提交 offset ,这个时候会导致消息再消费一次
重复消费问题不大,怕的是你没有考虑到重复消费后,怎么保证幂等性的。
举个例子:假设你有个系统,消费一条数据就往数据库插入一条,要是你的一个消息重复插入两次,你不就插入两条数据了,这个数据不就有问题了?要是你第二次消费的时候,自己判断一下已经消费过了直接扔了,就能保证只有一条数据了
 
幂等性通俗点来说,就是一个数据或者一个请求给你重复来多少次,你都得保证对应的数据是不会发生改变的,不能出错。

 

所以第二个问题来了,怎么保证消息队列的幂等性?

其实还是的结合业务来思考,这里有几个思路:

  1. 比如你要拿个数据写库,你先根据主键查询一下,如果这数据有了,就不要插入了,就update一下

  2. 如果是写redis,那没有关系,每次都是set天然的幂等

  3. 如果不是上面两个场景,实际稍微复杂一点,你需要让生产者发送消息的时候,每条数据加上一个全局的唯一id,类似于订单id之内的东西,然后你这里消息到了之后,先根据这个idredis里面查询一下,判断一下之前有消费过吗?如果没有消息就进行处理,然后把id写入redis。如果消费过来,就不要进行处理,保证别重复处理相同消息即可

具体如何保证MQ的消息幂等,还得的结合具体业务进行分析


如何保证消息可靠性传输(如何处理消息丢失问题)?


分析:消息丢失是肯定存在的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条,不能对就是说的重复消费和幂等性维妮塔,不能少说的是数据别丢失了,这个问题是使用 MQ 必须要考虑的
 
这个数据丢失一般分为两种,要么是 MQ 自己丢了,要么就是我们消息的时候丢了。我们从 rabbitMQ kafka 分别来分析一下


RabbitMQ

生产者弄丢了数据

生产者发送数据到 RabbitMQ ,可能数据再半路就丢失了,因为网络的问题,都有可能
此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitQ 事务( channel .txSelect ),然后发送消息,如果消息没有被成功被Rabbit MQ 接收到,那么生产者回到异常报错,此时就可以进行回滚事务( channel .txRollback , 然后重试发送消息,如果收到了消息就可以提交事务( channel .t x Commit )。但问题是, RabbitMQ 事务机制一搞,基本上吞吐量就会下来,因为太消耗性能

 

所以一般来说,如果你要确保说写 RabbitM Q的消息不丢失,可以开始 confirm 模式,再生产者哪里设置开启 confirm 模式之后,你每次写都会分配一个唯一的 ID ,然后如果写入了 RabbitMQ 中, RabbitMQ 会给你回传一个 ack 确认消息,告诉你这个消息 ok 了。如果 RabbitMQ 没有处理这个消息,会回调你的n ack 接口,告诉你这个消息接收失败了,你可以进行重试而且你可以结合这个机制在自己的内存里面维护每个消息 ID 的状态,如果超过一定的时间还没有接收到这个消息的回调,你就可以重发
 
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务后会阻塞在哪里,但是 confirm 机制是异步的,你发送那个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收到后会异步回调你一个接口通知你这个消息收到了
所以一般在生产者这快避免数据丢失,都是用 confirm 机制

 

rabbitMQ弄丢数据:

就是rabbitMQ自己弄丢了数据,但是你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化操磁盘,哪怕是 RabbitMQ 自己挂了,回复之后会自动读取之前存储的数据,一般数据不会丢失,除非是极其罕见的 RabbitMQ 还没有持久化,自己就挂了,但是可能会导致少量数据丢失,但是这个概率比较小。
 
设置持久化有两个步骤,第一个是传教 queue 的时候将其设置为持久化,这样可以保证 RabbitMQ 持久化 queue 的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候见消息的 deliveryMode 是指为 2 ,这就是将消息设置为持久化,此时 RabbitMQ 就会将消息持久化到磁盘上去,必须要同时设置这两个持久化才行,哪怕是 RabbitMQ 挂了,再次重启,也会从磁盘上去重启恢复Queue,恢复这个Queue里面的数据
 
而且持久化可以跟 Rabbit MQ 那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知ack了,所以哪怕是持久化到硬盘之前, RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的
 
但是存在一种比较极端的情况会丢失数据,就是你消息写到了 RabbitMQ 里面但是还没有来得及持久化到磁盘里面,刚好这个时候 RabbitMQ 挂了,就会导致存在内存里的一点数据会丢失

 

 

消费者弄丢了数据:

rabbitMQ中,消费者弄丢数据只有一种可能,就是使用了默认autoAck机制,当你的消费者使用默认机制消费数据时,刚好你接收到一条消息的时候,你的消费者挂了,因为你用的是自动提交机制,此时RabbitMQ会认为你的消息消费了,就会给你传下一条消息过来消费,所以造成了消息丢失

 

消费者端进行消息消费需要改成手动ack,只有当你程序正常运行完后,你才会收到去提交akc,所以不会导致消息丢失


kafka:

消费端弄丢了数据:

大家都知道 Kafka 会自动提交 offset ,那么只要关闭自动提交offset,再处理完之后手动进行提交,就可以保证数据不会丢失。但是此时还是会有重复消费,比如你刚处理完数据,还没提交offset结果自己挂了,此时肯定会重复消费一次,需要自己保证幂等性就好。

 

kafka弄丢了数据:

这块有一个比较常见的一个场景,就kafka某个 broker 宕机,然后重新选举 partition leader 时。大家想想要是此时其他的 follower 刚好还没没有同步数据,结果此时的leader就挂了,然后选举某个 follower 成leader之后,他就少了一部分数据,这样会造成数据丢失。

所以此时一般是要求起码设置如下4个参数:

 

  1. 给这个topic设置replication.factor参数:这个值必须大1,要求每个partition必须至少有2个副本

  2. 在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这是要求一个leader至少感知到有至少一个follower跟自己保持联系,没有掉队,这样才能确保leader挂了还有一个follower

  3. 在producer端设置acks=all:这个是要求每条数据,必须是写入所有的replica之后,才能认为是写入成功

  4. 在producer端设置retries=MAX:这是是要求一旦写入失败,就无限重试,卡在这里

 

在生产环境就是要按照上述的要求进行配置,这样配置后,至少要做kafka broker 端就可以保证再 leader 所在的 broker 发生故障,进行 leader 切换时,数据不会丢失

 

生产者端:

如果你按照上面的思路设置了ack =all ,一定不丢失,要求是你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才会认为本次写成功了。如果没有满足这个条件,生产者会自动不断的重试,重试无限次


以上是关于mq如何保证高可用,解决重复消费、数据丢失问题和顺序性问题的主要内容,如果未能解决你的问题,请参考以下文章

MQ那点破事,消息丢失重复消费消费顺序堆积事务高可用....

消息中间件(消息队列MQ)简介

关于MQ的几件小事如何保证消息不丢失

3、rabbitmq如何保证消息不被重复消费

Kafka在高并发的情况下,如何避免消息丢失和消息重复?kafka消费怎么保证数据消费一次?数据的一致性和统一性?数据的完整性?

消息队列重复消费和数据丢失问题(石衫面试突击学习笔记)