聊聊消息队列
Posted OmniStack
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊消息队列相关的知识,希望对你有一定的参考价值。
背景:随着互联网技术的高速发展,分布式技术越来越多的应用在相关项目中,而消息队列作为分布式系统通信的基础组件得到了广泛的应用。
在异步处理、系统/模块之间的解耦和高并发场景下的流量控制等方面,消息队列发挥了重要的作用。
本人在工作中对消息队列亦多有涉及,也研究过几款开源的消息队列,如:LinkedIn的Kafka,阿里系的RocketMQ及其云化产品ONS和MNS等。
本文不讨论某一具体的消息队列,而是试着在宏观层面聊聊消息队列的一些话题。
消息队列的应用场景
异步:很多场景下,系统不需要立即处理消息,此时可以将消息存入队列中,并在之后的适当时间再进行处理。
比如,在分布式环境中,统计A服务调用B服务的次数。此时,可将调用日志写入消息队列,由C服务读取消息队列的日志消息来计算调用次数。如下图:
解耦:不同系统甚至异构系统避免直接依赖耦合。
比如,如果A服务直接依赖B服务,当B服务出现异常或者故障时,将影响A服务的稳定。
引入消息队列之后,A服务不直接依赖B服务,A服务发送的消息将保存在消息队列中,B服务故障时不能处理消息,消息也不会丢失,不会对A服务造成影响,待B服务恢复之后再处理,这样就实现了A服务和B服务的解耦。如下图:
流量控制:在高并发的环境中,服务请求瞬时大量增加,将对服务器造成极大的压力,如果引入消息队列,服务请求的大量增加将不会影响服务端的压力,避免系统崩溃。
典型的场景如电商每年的双十一活动等,如下图:
用户请求先写入消息队列而不是直接打到电商系统,如果队列已满,则拒绝后续的请求,如此就保护了后端电商系统不至于崩溃。
消息队列的架构图
消息队列从逻辑角度而言,可以分为四大模块:生产者模块、存储模块、消费者模块和配置管理模块,每个模块均可以集群方式部署。如下图所示:
生产者模块:主要负责接收上游系统发送过来的数据。
生产者模块主要解决的问题是,如何处理大量的连接请求,并且快速的处理发送过来的数据,将数据以适当的形式进行组装,然后发送给存储模块。
存储模块:主要负责存储生产者模块发送过来的数据,然后将存储的数据传递给消费者模块。
该模块需要解决如何序列化数据并存储,以及如何读取序列化的数据,该模块的性能对消息队列整体的性能影响较大。一方面,如果序列化及存储性能很高,那么接收数据的性能也会很高。
另一方面,如果读取序列化数据的性能很高,那么发送的性能也会很高,所以,该模块决定了消息队列的服务能力。
消费者模块:主要负责消费存储模块的数据。
该模块主要解决的是消费逻辑,比如:是广播消费还是集群消费等,也可以简单理解是广播还是单播。
Config Server:主要作为注册中心,存储组件的元数据,以及维护发送和消费关系等。
以上四大模块构成了消息队列的逻辑模型,如果进一步将各个模块分解的话,可以发现每一个模块都包含若干子模块。现分解如下:
生产者模块:主要有两个部分构成。一个是接收上游的数据;另一个是协议处理,负责将接收到的数据根据约定的协议进行解析,然后传递给存储模块。
存储模块:主要包括,数据接收处理、数据标识生成、数据写入和数据读取。该部分主要解决数据如何序列化及如何存储。
消费者模块:主要解决消费的逻辑处理,如:广播模式、集群模式和分组管理等。
各个模块的实现
该模块是生产者模块,根据上面的介绍,它需要实现如下功能:
1)接收上游系统产生的数据
2)使用适当的协议,解析数据包格式,组装数据
3)容灾
该部分需要处理的是大量的网络连接请求和协议选择。对于网络模型的选择,目前有select、poll和epoll等,这些模型各有优缺点和使用场景。
目前应用比较广泛的当属epoll模型,基于事件驱动的方式,典型的比如Mina框架、Netty框架,二者均是基于事件驱动,属于非阻塞的异步传输模式。
目前Netty社区很成熟,版本更新迅速,在互联网公司里应用很广泛,可以选择Netty。对于协议的选择,目前也有非常多的协议可供选择,比如:ProtoBuf、JSON、Thrift、Hessian等。
对于具体如何选择哪种协议,主要应该考虑两点:序列化/反序列化的时间和字节的大小。
对于网络传输而言,当然是字节越小越好,但是越小的字节,可能会导致压缩耗时过长,也就是如果压缩的越小,解压缩消耗的时间也越长,所以需要在这两者之间寻求一个平衡。
在容灾方面,对于Producer而言,其发送消息可能有默认的优先级,比如,默认优先发送到本地机房集群,本地机房宕机后再向其他机房发送。本地机房故障后,自动熔断。
我们可以直接选择使用Netty作为网络框架,至于协议,则可以根据已有的业务场景选择合适的协议。
该模块是存储模块,为了保证数据不丢失,必须做到高可用,为此必须在存储模块考虑常见的异常情况,如:系统Crash,机器断电死机等。
最简单的方式就是所有操作都是同步方式,同步的情况下,即便出现异常,使用方还可以尝试重试策略。根据上面的介绍,它需要实现如下功能:
数据接收处理:生产者模块接收的数据经过处理之后会传递给存储模块,由上面的描述可知,需要将该部分接收到数据按时间顺序排序进行串行化,这样能保持数据的入队和出队的一致性。
数据标识生成:简单的说就是为数据做好标识,类似数据记录的主键,该标识的作用主要是便于存储管理以及之后发送时的快速寻址。
数据写入和读取:该模块主要是将上述带标识的数据序列化到存储设备,考虑到写入的数据之后还要被读取,最好为写入的数据创建索引,这样在读取的时候将能快速寻址,快速读取数据。
消息队列的性能瓶颈主要取决于写入的性能,这个性能将决定消息队列的性能。
理论上,从速度上讲,文件系统最快,其次是分布式KV系统,最后是数据库,而可靠性正相反。具体选择哪种存储设备,要根据业务场景来做出合理的选择。
如果需要支持金融支付交易类等对可靠性要求很高的业务,那么可能数据库是比较好的选择;如果是支持日志类等对可靠性要求不是很高的场景,则可以选择分布式KV,如MongoDB、Hbase 及Redis,性能比数据库要好。
该模块是消费者模块,根据上面的介绍,它需要实现广播、单播等功能。
对于互联网的大部分应用来说,组间广播,组内单播是非常常见的情形。
消息需要通知到多个业务集群,而一个集群内有多台机器,只要一台机器消费这个消息就可以,这种方式就是属于组间广播,组内单播模式。
考虑这些常见的业务场景,一般比较通用的设计是支持组间广播,不同的组注册不同的订阅,组内的不同机器,如果注册一个相同的ID,则属于集群消费模式(单播);如果注册不同的ID,则属于广播消费模式(广播)。
消息队列无论是queue模式还是topic模式,其消费模式可概括为两种:竞争式消费和共享式消费。
点对点消费、集群消费属于竞争式消费,广播消费属于共享式消费。
对于集群消费模式,一条消息只要被集群内的任意一个消费者消费即可,而对于多条消息,集群内的消费者将共同分担消费。
比如有6条消息,集群内有3个消费者,那么可能每个消费者都消费2条,也可能是3个消费者分别消费1条、2条、3条,总共6条。
一般来说,消息队列应尽可能做到负载均衡,保证集群内的消费者尽可能均衡消费;对于广播消费模式,一条消息需要被集群内所有的消费者至少消费一次。
在消费者模块中,当消息队列需要将数据发送给消费者的时候,就涉及到数据是由消息队列推送给消费者,还是由消费者主动去拉取,两种方式各有优缺点。
推模式
该模式的优点很明显,就是消费者能实时的接收最新的消息,缺点也很明显,如果消费者的消费速度比发送者的速度慢,将造成消息在存储服务器上的堆积。
此时,服务器将推送给消费者的消息,消费者可能无法及时消费处理,造成消费者要么拒绝,要么抛出消费异常,虽然消息队列可以提供重试/重投递,但却会造成带宽的浪费。
拉模式
拉模式是由消费者主动向服务器发请求获取消息,由于主动权在消费方,所以,消费者可以主动控制消息消费的速度,避免消费者因为消费能力不足而导致的消费处理异常。
另一方面,该模式下,消费方可能无法准确的确定何时去拉取最新的消息,因为消费方无法准确的确定何时有最新的消息,也就无法及时的去拉取。
目前业界比较通用的做法是设定一个初始时间,然后以指数级时间增长去等待,但这样仍然无法保证能实时获取最新消息。在阿里云的消息队列MNS中,其提供了一种长轮询方案来优化。
具体做法是:在大并发的线程同时访问队列的情况下,如果队列里没有新的消息,那么其实不需要那么多线程同时去轮询,只需要有一个线程去轮询,其他线程等待,当轮询的线程发现队列里有新消息时,可以唤醒其他线程一起来获取消息,从而达到快速响应的目的。
通过阅读RocketMQ的源码可以发现,其内部只有Pull模式,Push模式也是通过长轮询实现的。
该模块通常作为注册中心存储组件的一些元数据,通常情况下,数据在存储模块中的偏移量以及发送和消费关系,可以考虑维护在该组件上。
比如,使用Zookeeper作为注册中心。该组件可作为独立的模块部署,并和各个组件建立长连接关系。
以上介绍了消息队列的整体架构及各个模块设计,下面我们再聊聊在分布式环境中,消息队列的其他话题。
比如如何保证消息的可靠投递,如何处理重复消息和消费确认,如何处理事务消息和顺序消息等话题。
关于消息队列的其他话题
在分布式环境下,由于网络等各种原因,消息可能存在丢失的情况,对有些情况,消息丢失不影响业务可以不做任何处理,而其他情况下,都需要保证消息不能丢失。
比较成熟的消息队列,一般都是在保证不丢消息的情况下,尽量减少重复投递。
如何解决消息丢失的问题,比较常用的做法就是,在发送消息之前,先将消息落地,然后再发送。
当失败或者不确定(如网络超时)时,由定时任务去轮询,最终一定能保证可靠送达。该种情况下,可能存在消息重复投递问题,可视具体业务场景做出相应的处理。
由于分布式环境的不可靠性,重复投递难以避免,除非允许消息丢失。
既然存在重复投递的情况,那就必须要确定是否必须处理重复的消息(有些场景下,可以不处理重复的消息),对于需要处理的重复消息,目前业界比较常用的做法有下面两种:
1)保证消费端消费的幂等性
2)在消费消息时,引入去重表,在去重表中插入消费成功的日志
如果业务端的消费是幂等的,那么无论消费多少条消息,其结果都一样,这是一种比较理想的情况。
多说一句,幂等性设计在分布式系统的设计中非常重要,我们在系统设计中,应该尽可能的保证幂等性。
关于如何进行幂等性设计,这是另外一个话题,本文不展开,感兴趣的读者,可以查阅相关资料。
如果业务端无法做到幂等,那么就可以引入去重表,在消费成功后,插入一条成功的日志,当有重复的消息时,和去重表的记录做比对,如果已经消费过了,那么就丢弃,否则重新消费该消息。
服务器将消息投递给消费者之后,消费者可能没有能力去处理该消息,对于该种情况下,应该允许消费者进行消费确认,也就是消费者主动ACK,并约定服务器下一次重投递的时间。
经典的转账场景:A账户向B账户转账100元,在分布式环境下,将可能出现数据不一致的情况,比如,A账户减少100元,但B账户却没有增加100元,这违背了事务一致性原则。
对于类似问题,阿里开源的RocketMQ对此给了解决方案。而业界对此的解决方案,大致有两种:
1)使用分布式事务
2)使用本地事务,然后做补偿
分布式事务的最大问题就是性能,无论是两阶段提交还是三阶段提交,性能都很难令人满意,所以很少有人会直接使用分布式事务来解决该问题。
使用本地事务较为简单,具体做法就是在本地事务中,在A账户扣钱的同时,将消息落库,因为是本地事务,可以保证该阶段操作的强一致性。然后发送消息,如何保证一定能成功发送消息就是上面提到的消息可靠投递的问题了。
消息投递的顺序在有些场景下也是很重要的,比如订单状态流转场景,每个状态流转有严格的顺序要求。不过,大多数消息队列都没有提供顺序消费模式,因为实现的成本非常高。如果需要保证顺序,通常情况下有两种做法:
1)引入版本号:生产者在发送消息时增加版本号,消费者消费时与自己当前的版本号作对比,如果接到的消息版本号比自己当前的版本号大,则消费,
否则,不做处理。这种方案从某种程度上来说,生产者和消费者发生了耦合。
2)使用状态机:一般涉及交易支付的系统都有严格的状态流转,比如订单状态流转,如果当前处于待支付状态,用户支付之后,可能又做退款申请,那么有可能退款申请的消息先于支付完成的消息到达,这种情况就不满足状态机的状态流转要求,可做消费失败处理,等待支付完成的消息到达后再进行消费处理。
总结
消息队列作为互联网后端基础设施之一,在异步、解耦及流控等方面应用广泛。本文从消息队列的背景和使用聊起,主要介绍了消息队列的架构,包括各个模块的设计实现、可靠投递、重复消息处理、消费确认、事务消息及顺序消息等话题。
希望能让大家对消息队列有一个整体的认识。本文主要是自己在工作及业余学习消息队列的一些积累,肯定有不少的错误和疏漏,欢迎大家批评指正,多多交流。
作者介绍
徐从伟
2016年8月加入买单侠基础架构团队,在此之前,曾在交通银行、花旗软件、1号店等公司任职。
现负责消息队列、缓存及安全体系方面的工作。本人对互联网中间件、并发编程和NIO异步编程比较感兴趣。
往期图文
以上是关于聊聊消息队列的主要内容,如果未能解决你的问题,请参考以下文章