基于Kafka1.0消息可靠性设计之消费重试策略

Posted 唯技术

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Kafka1.0消息可靠性设计之消费重试策略相关的知识,希望对你有一定的参考价值。

 引言 

在企业应用中,Kafka是一款非常受用户青睐的开源产品,随着产品研发的逐步成熟,其应用的领域也越来越广泛,例如:企业信息归集(如:日志)、大数据流处理平台、基于事件通知的应用系统构建等方面,都是Kafka大展身手的地方;同时伴随其生态产品日渐丰满,让各类用户使用起来更加得心应手;但由于Kafka自身还存在bug、研发人员对Kafka系统理解不足或逻辑处理不当,很难避免生产问题的发生。本文总结了公司在使用Kafka过程中遭遇的种种问题,以及在修复和完善系统过程中展开的一些思考和实践探索。


本文将从利用Kafka构建事件驱动型应用的角度,来介绍怎样实现业务的高可靠、高可用场景;重点从消费过程进行介绍:在Kafka消费阶段出现异常后,如何优雅的解决业务上普遍性的问题,提供统一有效的处理方式去帮助业务开发摆脱繁琐的设计和复杂的实现。


在Kafka消费逻辑中,开发人员经常要面对各种运行期异常的处理,比如:


  • 消息消费时,调用下游服务失败后如何处理消息,采取立即重试还是缓存后再处理;

  • 本地重试,重试频率怎么选择,是否采用丢弃策略,丢弃的消息是否有告警以及追溯等问题;

  • 若采用缓存方式,是本地缓存还是集中式缓存(数据库/缓存);消费实例failover后能否在本地快速恢复状态数据;

  • 重试机制是否会导致消费堵塞,影响系统的吞吐;消息的同步重试常会带来性能的退化,如果采用异步处理,线程的复杂度以及消费进度的管理又成为棘手的难题;

  • 异常消息的保存问题;反复消费失败的消息,可能是其协议解析出错或者调用服务的故障引起,重试并不能最终解决问题;对此类重试达到最大次数依然失败的消息需进行额外存储,以方便问题排查和后期补偿,同时需对这类消息(死信)建立起不同用户的隔离机制。


以上列举了消费过程中常出现的问题,本文将从这些方向着手,详细讨论如何从消息组件角度去解决此类问题;保证消息可靠性的同时,尽可能优化系统性能,提升系统的并行能力和异常failover机制。


 当前现状 

对于消费异常,业务上常采用的几种异常处理方案:


  1. 消息进行本地重试,确保消息处理成功(阻塞的方式反复重试直至成功);处理逻辑简单,但会影响系统吞吐,增加平均延迟时间。

  2. 完全交由业务逻辑进行处理,例如:定时审计对账的方式,对执行失败的消息,统一调度进行补偿;消费逻辑里无需考虑失败的处理(记录错误日志即可),直接丢弃并继续后面的消费;这需要完善后台审计服务去回查业务数据,保证数据处理的完整性。

  3. 对消费失败的消息进行本地或远程保存(如数据库/缓存等),消息状态置为消费失败,并继续执行后面的消息;在本地或远程服务中配置特定任务来扫描和标记异常的消息,并进行消费补偿;该方式较为复杂需要考虑本地或远程存储逻辑(确保消息的持久化),以及解决消息的消费补偿问题。

  4. 利用消息系统自身的存储能力,建立消费组的重试队列(不同于原始消费队列);当遇到消费失败的情形,将消息发往重试队列,并继续向前执行后来的消息;消费者在订阅消费队列时,默认订阅该组的重试队列,保证进入重试队列的消息还能继续投递回当前的消费组。


目前,大致有以上几种方法来处理消费异常,我们在调研多款消息产品和业务需求时,也充分考察了各种处理方法的优缺点,并综合消息异常处理的诸多细节问题,基于kafka1.0引擎设计了适合本公司大部分应用场景的一套重试机制,在消费异常时采用此套重试方案去保障业务的高可靠和性能的要求。


 试用场景介绍 

唯品会内部大量业务场景适用于Push的消费模式(如图1), 用户向客户端注册消费监听Listener函数,在Listener函数中实现消息的消费逻辑,并通过其返回的状态码来确定客户端怎样执行消息的Ack逻辑(进度提交)。


图 1 Push消费模式


Push模式让开发者更聚焦业务逻辑的实现,无需过多考虑消息引擎相关的事务处理;消息相关的操作交由Push客户端来完成;对消费成功的消息,客户端执行消费进度的提交,而消费失败的消息按其状态返回值选择相应的异常处理策略;消息重试(依赖重试队列实现)便是其中一种重要的应对策略。


接下来将详细介绍重试队列和重试策略的相关内容。


 目标和价值 

重试队列的目标和价值:实际生产中消息重试方案会带来哪些价值,实现消息重试功能的意义有哪些呢?


  1. 消费失败时, 本地反复重试,会引起消费堵塞,造成系统吞吐的下降;而重试策略能很好的降低失败时单条消息连续重试的频率;执行流程上会先把失败的消息发送到重试队列进行持久化存储,一定延时后重新投递回消费端;而当前执行的消费队列会跳过该失败消息继续向前消费,从而提升消费端整体的吞吐性能和降低消息平均处理时延;

  2. 消费失败存在偶然性,常发生于下游服务瞬时的容量瓶颈、网络突发抖动或路由丢失等因素引起;偶发性消费失败,通过多次延迟再消费的方式,很大概率上能够成功(下游服务恢复,网络恢复正常等);采用重试策略能优雅的解决此类问题,通过多次消息的重试消费能保障处理的成功率;同时给异常的消息增加一定延时再投递,能很好的避免造成下游服务的雪崩效应;

  3. 业务消费失败后,常常难以解决消息存储和重新投递的问题,并且各类应用都存在相同的问题需要解决;重试策略从消息引擎角度出发,综合考虑了常见业务场景,抽象出统一的解决手段,极大的保障了消息的存储和再次投递的问题,提升消息的可靠性;

  4. 线上业务通常是同一消费组下部署多个消费实例,有些情况单条消息消费失败时,希望实现多服务实例之间的failover机制(A消费实例消费消息M失败时,有能力投递消息M至B或更多的服务实例去处理),而采用重试队列能解决消息在消费实例间的failover的问题;

  5. 便于用户实现基于消息的状态机模型,通过状态机模型来实现业务上严格顺序的消费场景;例如:在业务中对消息进行状态编排,每条消息都有自身的状态值,消息投递时可能出现消息的混乱,通过业务逻辑对消息的状态的判断来确定是否执行,若还未满足状态的消息,通过先发回重试队列,延后消费的方式来调整执行顺序;

  6. 消费达到最大重试次数后(VMS中默认20次),消息可以发往死信队列并持久化存储,方便对消息质量进行监控和审查。


 消息重试的功能需求 

  • 消费时设置消息重试状态后,消息被发往重试队列并持久化存储;

  • 根据消息重试次数采用缺省的延迟等待后,对消息进行重新投递,并由同消费组的实例继续执行;

  • 用户接口亦可配置每次重试的延迟时间(默认为递延的时间来确定),灵活的调整消息重新投递的规则;

  • 重试时提供消息级别的failover机制,比如A实例消费失败后,被发回重试队列,再次被投递时可路由至消费实例B,某种情况下可以提升处理的成功率;

  • 重试消费至最大次数后,消息被存储至每个消费组对应的死信队列进行永久保存;


 消息重试的执行流程 

  1. 消息被成功消费时,正常提交消费进度(group offsets)至Kafka集群;

  2. 消费失败时若设定消费状态为丢弃,执行和1相同的逻辑,正常提交当前进度;

  3. 消费失败时若设定消费状态为重试,此时客户端按照重试策略来执行重试逻辑;根据重试次数将消息发往不同延时等级的Topic(稍后介绍),延时时间到期后,消息被发送至重试队列;另外,客户端启动订阅时,也默认订阅了该消费组的重试队列,因此一旦消息进入重试队列,客户端会重新获取到此消息并立即执行消费逻辑;

  4. 重试的消息可能因协议错误或对应下游服务出错,一直处于消费的重试状态,为避免无限次重试造成系统开销,通过最大重试次数来防止无限循环(VMS中最大重试次数为20次),达到最大重试次数后,该消息被标记为死信并发往死信队列进行保存,供监控和审查系统进行处理。


基于Kafka1.0消息可靠性设计之消费重试策略

图 2 重试队列执行流程图


 重试功能相关模块介绍 

基于Kafka1.0来实现消息重试功能需要包括以下几个核心功能模块:


  • 消息的Fetcher服务; 包括了Kafka客户端基本的消息功能点,如队列分配策略,负载均衡机制,消息订阅方式,pull消息功能等;

  • 消费线程的管理模块;支持多线程方式去消费来自同一分区或多个分区的消息,支持更好的并发粒度;

  • 消费重试管理模块;通过用户Listener返回值来决定消息的后续处理,比如更新队列的offset状态,是否发回重试或死信队列,延迟间隔时间选择等;

  • 消费进度管理模块;该模块要管理已分配分区的消费进度信息,以及提交进度的策略,如在多线程模式下需采用滑动窗口方式来管理单分区下的并行消费进度信息,保证消息的可靠性(非本文阐述的重点,后面文章再详解);


针对消费重试功能,模块间调用执行时序如下:


基于Kafka1.0消息可靠性设计之消费重试策略

图 3 模块执行时序图


 延时消息服务简述 

本案例中,消息重试功能依赖于延迟消息的实现,即不同重试次数的消息可采用默认或手动指定延时长短来确定下次投递的时机;目前代码实现中,延迟消息需要后端消息的延迟服务来支持,其主要的特点及功能如下:


  1. 此服务需在broker上创建不同延迟等级的topic(目前包含20个延迟topics,每个topic采用3副本和高可靠设置);

  2. 每个延迟等级的topic,都有固定的延迟周期(从5s到2h的区间分布);

  3. 该服务应对每个延迟topic都有一组消费实例在订阅消息,目前消费实例数与broker数量相同,保证服务的failover能力;

  4. 该服务通过DelayQueue来实现消息的过期判断,过期的消息被发送至真实的topic保存(真实topic属性保存在消息的header中);

  5. 判断消息过期的时间取自kafka消息的存储时间(appendLogTime),以此降低客户端机器与kafka服务器时间不同步的误差;


后端延迟服务需要解决几个重点问题:


  • 保证延迟消息消费进度提交的可靠性,确保消息不丢;

  • 解决发送失败重试与延迟队列消费堵塞的问题;

  • 消息延时投递的精度问题;从目前延迟需求和吞吐来分析,jdk的DelayQueue已能满足现在的精度要求,若需要更高的精度后面会考虑用timewheel来实现;


目前,消息重试功能强依赖于后端的延迟消息服务,以实现不同延迟级别下消息重新投递的功能;此项服务除了支持消费端延时重试功能外,也能支持发送端按用户指定的时延级别进行延迟消息的发送。


 总结 

基于Kafka1.0引擎实现消费端的重试策略,能很好地提升消费的性能和可靠性,但消费的可靠性并非只依赖重试机制就能解决;比如消费进度的提交,在多线程并发时会出现进度的异常覆盖而导致消息的丢失,这就需要多种可靠机制的组合才能达到理想效果。


后面我们将介绍更多的保障机制和新功能扩展来提升kafka可靠性和应用范围,例如:通过队列的滑动窗口来改善消息进度提交的可靠性、同group组内如何实现广播机制、消息的灰度机制、如何支持消息的动态回溯等功能特性。


推荐阅读

kafka数据可靠性深度解读



灾难连锁之RedisCluster下线节点导致集群崩溃事件整理反思



VDL:唯品会强一致、高可用、高性能分布式日志存储介绍(产品篇)


欢迎投稿!!

只要是技术相关的文章尽管砸过来!


以上是关于基于Kafka1.0消息可靠性设计之消费重试策略的主要内容,如果未能解决你的问题,请参考以下文章

Kafka1.0.0如何查看队列消费进度

RocketMQ 消费重试简介

四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

rocketmq杂记-失败重试

rocketmq杂记-失败重试