消息队列

Posted strong-fe

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列相关的知识,希望对你有一定的参考价值。

1、为什么要用消息队列

解耦、异步、削峰

技术图片

 

 

技术图片

 

 

A系统调用B系统、C系统,传统的调用是直接调用,但是当B系统说我不需要你提供数据了,这时候A需要改代码,C系统说我不需要某个字段了,这时候A也要改代码,如果又多了一个D系统,A又要写代码。为了实现解耦,引入消息队列,A将产生的数据丢到消息队列中,哪个系统需要 哪个系统就去取;
A系统调用B系统,B系统由于某个需要调用第三方接口超时,导致A系统响应速度慢,而B系统的好坏又不会影响业务逻辑,所以可以改为A异步调用B,A将消息丢到消息队列中,B系统订阅消息,实现A的快速响应;
当大量流量请求到系统A时,由于数据库的处理能力有限,造成数据库连接异常。使用消息队列,大量请求先丢到消息队列中,系统A使用按批拉数据的方式,批量处理数据,生产中,高峰期短暂的消息积压是允许的。

 

2、使用消息队列有什么缺点

系统复杂性增加:加了消息队列,需要保证消息不会重复消费,需要保证消息的可靠性,需要保证消息队列的高可用
系统的可用性降低:如果消息队列挂了,那么系统也会受到影响

 

3、RocketMQ和ActiveMQ的区别

ActiveMQ严格遵循JMS规范,可持久化到内存、文件、数据库,可用性高主要是主从,多语言支持,消失丢失率低;
RocketMQ持久化到磁盘文件,可用性非常高,支持分布式,只支持Java,消息理论上不会丢失;

 

4、MQ能否保证消息必达,即消息的可靠性(如何处理消息丢失的问题)?

 技术图片

 

 技术图片

 

 

丢数据,mq一般分为两种,一种是mq自己弄丢数据,一种是消费的时候弄丢数据。以rabbitmq为例。

(1)生产者弄丢数据

生产者将数据发送到rabbitmq的时候,可能数据就在半路给搞丢了,因为网络等问题。

解决方案一:用rabbitmq提供的事务功能,就是生产者发送数据之前开启rabbitmq事务(channel.txSelect)。
发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;
如果收到了消息,那么可以提交事务(channel.txCommit)。
rabbitmq事务机制的缺点:基本上吞吐量会下来,因为太耗性能。 解决方案二:开启confirm模式 在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。
如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。
可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。 事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,
但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息rabbitmq接收了之后会异步回调你一个接口通知你这个消息接收到了。 所以一般在生产者这块避免数据丢失,都是用confirm机制的。 (2)rabbitmq弄丢了数据 rabbitmq自己弄丢了数据,这个必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。 设置持久化有两个步骤: 第一,是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据; 第二,是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。 而且持久化可以跟生产者的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。 (3)消费端弄丢了数据 rabbitmq如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,rabbitmq认为你都消费了,这数据就丢了。 这个时候得用rabbitmq提供的ack机制,简单来说,就是你关闭rabbitmq自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那rabbitmq就认为你还没处理完,这个时候rabbitmq会把这个消费分配给别的consumer去处理,消息是不会丢的。

 

5、如何保证消息不被重复消费啊(如何保证消息消费时的幂等性)?

技术图片

 

 

保证幂等性
(1)比如数据要写库时,你先根据主键查一下,如果这数据都存在了,就无需插入,update一下就可以了 (2)比如写redis,每次都是set,天然幂等性 (3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。 (4)比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据

 

6、如何保证消息的顺序性?

(1)rabbitmq保证数据的顺序性如果存在多个消费者,那么就让每个消费者对应一个queue,然后把要发送 的数据全都放到一个queue,这样就能保证所有的数据只到达一个消费者从而保证每个数据到达数据库都是顺序的。rabbitmq:拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。

技术图片

 

 

(2)kafka保证数据的顺序性
 kafka 写入partion时指定一个key,列如订单id,那么消费者从partion中取出数据的时候肯定是有序的,当开启多个线程的时候可能导致数据不一致,这时候就需要内存队列,将相同的hash过的数据放在一个内存队列里,这样就能保证一条线程对应一个内存队列的数据写入数据库的时候顺序性的,从而可以开启多条线程对应多个内存队列kafka:一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue即可

技术图片

 

7、如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
(1)先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉

(2)新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量

(3)然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue

(4)接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据

(5)这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据

(6)等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息

 

以上是关于消息队列的主要内容,如果未能解决你的问题,请参考以下文章

消息队列属性及常见消息队列介绍

RabbitMQ 消息队列

Linux进程间通信 --- 消息队列

Redis(五)-特性-消息队列

redis消息队列有没有

到底啥是消息队列?Java中如何实现消息队列