消息队列常见使用场景梳理

Posted 码代码的小司机

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列常见使用场景梳理相关的知识,希望对你有一定的参考价值。

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。本文主要阐述消息队列常见业务场景和技术方案。部分内容网上摘录!


1 何时需要使用消息队列


当你需要使用消息队列时,首先需要考虑它的必要性。可以使用mq的场景有很多,最常用的几种,是做业务解耦/最终一致性/广播/错峰流控等。反之,如果需要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。


2、consumer消费到重复消息怎么办?

消息消费模式

消息消费一般存在三种模式:最多一次,最少一次和有且仅有一次。


最多一次

这种可靠性最低,不管消费是否成功,投递一次就算完了。这种类型一般用在可靠性不高的场景中,比如我们一个对日志分析展示的场景,如果这种日志分析出现一定的缺失对业务也影响不大,那我们可以使用这种方式,这种方式性能最高。


最少一次

基本上所有追求可靠性的消息队列都会采用这种模式。因为网络是不可靠的,要在不可靠的网络基础上构建可靠的业务,就必须使用重试来实现,那么重试就有可能引入重复的消息。


有且仅有一次

这是人们最期望的方式。也就是我如果真正的处理失败了(业务失败)你才给我重发,如果仅仅是因为网络等原因导致的超时不能给我重发消息。但是这种仅仅靠消息队列自身是很难保证的。不过借助一些其他手段,是能达到有且仅有一次的『效果』(MQ的幂等检查)。


通过上面的描述,我们知道有且仅有一次的消息投递模式是很难达到的,那如果我们需要消息的可靠性,就必须接受重复消息这个事实。那么对于重复消息到底该怎么办呢?下面会列出一些场景和解决方案:


处理方式

不处理

这也算解决方案么?我当然不是说所有的重复消息都可以不处理的,但是是有场景是可以的。比如我们有一个缓存,数据库更新之后我们发送一条消息去将缓存删掉,等下一次数据访问的时候缓存没有命中,从数据库重新加载新数据并更新缓存。这里的消息就是删除缓存的消息,那么删除缓存这个消息就是可以接受重复消息的,这种重复消息对业务几乎没有影响(影响也是有,可能会稍微降低缓存命中率),我们权衡一下处理重复消息的成本和这个对业务的影响,那么不处理就是个最佳方案了。可能有同学说,降低缓存命中率也不行啊,还是得解决。那么我们看这个重复消息会降低多少命中率呢?那就得看重复消息多不多呢?重复消息一般是网络不稳定导致的,这在内网里这种情况其实并不常见,所以我觉得是可以接受的。


业务处理

有同学讲这不是废话么?重复消息当然是我们业务处理啊。我这里说的业务处理是说有很多业务逻辑自身就是能处理重复消息的,也就是有很多业务逻辑本来就是幂等的。这里有个题外话:即使我们不使用消息,也要尽量将我们的接口设计为幂等的。比如我们有一个创建新订单的消息,接到消息后会向数据库保存新订单。那么如果我们接到了重复订单(订单号相同),这样的订单肯定是不能保存的,但是这里切记一点,虽然最终我们不会保存两个一样的订单,但是收到重复订单的时候你就回复成功就可以了,不要抛出异常,因为抛出异常一般会认为是消息消费失败,又会重发。这在早期我们很多同学犯这个错误,就直接将DuplicateKeyException异常抛出了(其实对于接口幂等设计时也是一样,第二次重复调用的时候你返回成功的响应就行了,如果要告诉人家是重复的也在另外的字段告诉,而不是标识成功或失败响应的地方标识,这样会让请求方的处理代码更舒服些)。


2、consumer消息如何顺序消费?

消息投递的顺序是消费者关心的第二个问题。很遗憾,实现顺序消费的成本也是非常高的,所以大多数消息队列没有提供顺序消费模式。Kafka因为它独特的存储模型,所以提供了顺序消费这种方式,但是也是有其他限制的。那么如果我要求顺序消费该怎么办呢?


处理方式

不处理

这个我就不啰嗦了,其实这个场景可以直接借用上面重复消息里的场景,删除缓存的消息先发的后到是没有多大关系的。


业务处理

和上面一样,绝大多数业务逻辑是本身就是能处理顺序的。比如我们的交易系统里有很多很多状态机,状态机有严格的状态扭转流程。比如我们的支付状态机,我们从待支付->支付完成->退款中->退款完成。那假设现在我们是待支付状态,然后用户支付后又立即申请退款,那么有可能退款中的消息比支付完成的消息先到(这种几率也是非常非常低的),这是不可以的,不满足状态机扭转条件,所以我们可以抛出异常告诉消息队列消费失败即可,等到后面支付完成的消息到达后将状态扭转为支付完成,然后等到退款中的消息到之后才将状态扭转为退款中。


额外字段

不要求严格有序

那么如果我们的系统没有状态机这种东西,靠业务逻辑不好处理顺序该怎么办呢?那我们可以借助额外的字段来处理了。一般在数据库设计中,我们都建议每个数据库表都有这样两个字段:created_time和updated_time,也就是这条数据的创建时间戳和更新时间戳。那么如果我们不要求消息严格有序的时候就可以借助updated_time字段来控制顺序了。比如我们接到一条消息,然后需要更新数据库,然后我们发现消息中携带的时间戳比我们数据库中记录的时间戳还小呢,那这条消息我不用消费了,直接返回成功就行了。使用这种方式有两个限制:1. 不要求严格有序,只要有序就可以了,中间可能少几条的更新对业务没有影响。 2. 服务器之间的时间不能出现较大的偏差(这个通过时间同步服务一般都能保障)。


严格有序

那么如果我们要求严格有序呢?就是中间不能出现缺口。那么这种就不能依靠时间戳了,那我们可以添加一个整型的version字段,消息里也携带了一个version字段,每次更新数据的时候我们采用这种更新方式(乐观锁): update tbl set ......, version=version+1 where version=@message.version。如果这条语句没有更新成功,则返回行数就不为1,这个时候我们抛出异常让消息队列重试,等到合适的消息到来的时候就会更新成功了。使用version字段这种方式可能就要producer和consumer进行协调了(其实就是有些耦合了),因为消息里也要携带version字段。但是设计这个version字段的时候也要一些考虑,我更建议的更新方式是update tbl set ......,version=@message.newversion where version=@message.oldversion。这样如果发送消息的version因为某些原因(比如有些更新并不发送消息)没有严格递增两边也可以兼容。还有一点是,要控制consumer端数据的写入点。比如我们的处理消息消费的地方更新数据外还有另外一个地方更新数据,这个地方将version给更新了,那么就会导致producer和consumer版本不一致,最后导致怎么也同步不起来了。而producer方也要控制,比如如果我们人肉直接去表里修了数据可能就没有消息发出了。

并发更新

既然说到version,这里还提一下用version来处理并发更新吧。在去哪儿有很多场景里会将数据库当做key/value存储使用,比如一个订单,我们的表设计可能是订单号,订单内容(一个结构化json),created_time, updated_time, version。我们不再使用列的方式来存储订单,至于这种方式的优点和缺点就不在本文讨论之列了。那么如果来了一条订单更新消息,我们需要对这个结构化json进行更新该怎么处理呢?会先读取这个json,然后将消息里的内容与json进行merge操作,然后将merge结果写回数据库。那如果我们在merge的过程中订单已经被更新了怎么办,这就涉及并发更新控制的问题,如果不加以控制则可能导致更新被覆盖。那我们一般采取的方式是:


这种方式就可以避免并发更新冲突覆盖的问题了,但是冲突之后怎么办呢?一般会采取重试的办法。我们会发现这种并发冲突的概率并不会很大,而且出现之后只需要重试一下基本上就可以处理了:


我们发现这种处理方式在我们的场景中一遍又一遍的出现,那我们也将这种方式直接内置到组件之中了(NeedRetryException):


异步处理

有的时候我们接到消息后并不是在接收消息的线程里处理消息,比如我们接到消息后会发起一个异步的服务调用,那么我们并不能立即知道这个异步服务调用的返回结果。它的返回结果是在另外的线程里,但是消息队列需要知道消息的消费结果。一般消息队列的consumer client会自动的返回消费结果,但是因为异步的方式切换了线程,所以需要应用显式的告诉结果,这就需要使用显式ack的机制了



以上是关于消息队列常见使用场景梳理的主要内容,如果未能解决你的问题,请参考以下文章

消息队列常见的几种使用场景介绍!

消息队列常见的 5 个应用场景

消息队列常见的 5 个应用场景

消息队列常见的使用场景

消息队列常见的几种使用场景介绍

常见的 5 种 消息队列 使用场景