消息队列RabbitMQ原理消息队列保证幂等性,消息丢失,消息顺序性,以及处理消息队列消息积压问题

Posted haijiao12138

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列RabbitMQ原理消息队列保证幂等性,消息丢失,消息顺序性,以及处理消息队列消息积压问题相关的知识,希望对你有一定的参考价值。

消息队列

  • 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已

  • 常见的消息队列

    • RabbitMq ActiveMq ZeroMq kafka等;

  • 为什么使用RabbitMq?

    • RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言的。

    • 可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;

    • 高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;

    • 集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单;

    • 社区活跃度高,根据网上资料来看,RabbitMQ也是首选;

  • 工作机制

    • 生产者:消息的创建者,负责创建和推送数据到消息服务器;

      消费者:消息的接收方,用于处理数据和确认消息;

      代理:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

    • 原理流程:就是生产者发送消息到虚拟主机,虚拟主机把消息交给指定的交换机,交换机按照规则扔给消息队列进行存储,消息队列等待消费者来消费。

    • 消息持久化(三个条件)

      • 投递消息的时候durable设置为true,消息持久化;

      • 消息已经到达持久化交换器上;

      • 消息已经到达持久化的队列;

    • 持久化工作原理

      • Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。

    • 持久化缺点:

      • 消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。

MQ用途

  • 同步变异步消息

    • 用户下单完成后,发送邮件和短信通知。 运用消息队列之后,用户下单完之后,下单信息写入数据库,再写入消息队列,发送邮件和发送短信各自去消息队列进行读取,节省时间,提高效率。

  • 应用解耦

    • 场景:用户下单后,订单系统需要多渠道通知用户。 a、下单服务系统:用户使用下单服务后,将下单信息写入数据库,下单成功。 b、短信服务系统:用户下单后,将短信信息写入消息队列,以发送短信信息通知用户交易信息。 c、邮件服务系统:用户下单后,将邮件信息写入消息队列,以发送邮件信息通知用户交易信息。 这样,如果微信通知不能正常使用,也不影响用户下单,用户下单后,只用把下单通知信息写入消息队列,不用关心后续操作,实现了订单系统和通知系统的解耦。

  • 流量削峰

    • 一般在秒杀或者团购活动中使用。 场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。针对这个问题,一般需要在应用前端加入消息队列。 a、可以控制活动的人数 b、可以缓解短时间内高流量压垮应用 用户的请求,服务器接收后,首先写入消息队列,如果消息队列的数量大于最大的数量,则直接抛弃用户请求或者跳转错误页面。

消息流程

  • *生产者(producer)*把消息发送给交换机。当你创建交换机的时候,你需要指定类型。交换机的类型接下来会讲到。

  • *交换机(exchange)*接收消息并且负责对消息进行路由。根据交换机的类型,消息的多个属性会被使用,例如路由键。

  • *绑定(binding)*需要从交换机到队列的这种方式来进行创建。在这个例子里,我们可以看到交换机有到两个不同队列的绑定。交换机根据消息的属性来把消息分发到不同的队列上。

  • *消息(message)*消息会一直留在队列里直到被消费。

  • *消费者(consumer)*处理消息。

RabbitMQ核心概念

  • 生产者(Producer):发送消息的应用。

    消费者(Consumer):接收消息的应用。

    队列(Queue):存储消息的缓存。

    消息(Message):又生产者通过RabbitMQ发送给消费者的信息。

    连接(Connection):连接RabbitMQ和应用服务器的TCP连接。

    通道(Channel):连接里的一个虚拟通道。当你通过消息队列发送或者接收消息时,这个操作都是通过通道进行的。

    交换机(Exchange):从生产者那里接收消息,并根据交换类型分发到对应的消息列队里。要实现消息的接收,一个队列必须绑定一个交换机。

    绑定(Binding):绑定是队列和交换机的一个链接。

RabbitMQ几种集群模式

  • 单机模式

  • 普通集群模式

  • 镜像集群模式

保证幂等性

  • 假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?

  • 一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

    • 解决

      • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧

      • 通过redis,那没问题了,反正每次都是set,天然幂等性

      • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可;

      • 还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据;

消息丢失

  • 生产者将数据发送到rabbitmq的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能

  • 生产者端丢失数据解决

    • rabbitmq事务机制(同步的):生产者发送数据之前开启rabbitmq事务(channel.txSelect),然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)

    • 开启confirm模式(异步的),在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发

  • rabbitmq弄丢了数据 就是rabbitmq自己弄丢了数据,这个你必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。

    • 步骤一:一个是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据;

    • 第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。哪怕是你给rabbitmq开启了持久化机制,也有一种可能,就是这个消息写到了rabbitmq中,但是还没来得及持久化到磁盘上,结果不巧,此时rabbitmq挂了,就会导致内存里的一点点数据会丢失。

  • 消费者端丢失数据

    • rabbitmq如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,rabbitmq认为你都消费了,这数据就丢了。这个时候得用rabbitmq提供的ack机制,简单来说,就是你关闭rabbitmq自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那rabbitmq就认为你还没处理完,这个时候rabbitmq会把这个消费分配给别的consumer去处理,消息是不会丢的。

消息顺序性

  • rabbitmq保证数据的顺序性

    • 如果存在多个消费者,那么就让每个消费者对应一个queue,然后把要发送 的数据全都放到一个queue,这样就能保证所有的数据只到达一个消费者从而保证每个数据到达数据库都是顺序的。

如何处理消息队列积压问题

  • 积压分三个方面

    • broker producer consumer

  • broker

    • 处理业务能力极强 性能高 还可以水平拓展 不用担心

  • producer端优化:

    • 一般先执行自己端的业务逻辑 才发消息 检查是不是自己的业务逻辑耗时太多 (设置批量发送以及批量发送的大小可以改善)

    • 发生消息积压后 producer端服务降级 关闭一些非核心业务 减少消息的产生

  • consumer端的优化

    • 主要原因是:消费者的消费能力跟不上生产者的额生产能力

    • 扩容方案:利用临时消费者 消费原来队列中的消息 让消费者不做任何耗时的动作 将消息均匀写入创建的队列中 将更多consumer部署到更多的服务器来消费新创建的队列上的消息;

    • 等待积压的消息被消耗到正常水平 撤掉扩容服务器;

以上是关于消息队列RabbitMQ原理消息队列保证幂等性,消息丢失,消息顺序性,以及处理消息队列消息积压问题的主要内容,如果未能解决你的问题,请参考以下文章

MQ消息队列的重复消费问题的通用解决办法以及幂等性的原理

Rabbitmq之发布确认高级回退消息备份交换机幂等性优先级队列惰性队列

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

RabbitMQ 消息队列学习

消息队列怎么避免重复消费

基于消息队列(RabbitMQ)实现延迟任务