架构实战(10)——消息处理中的死循环

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了架构实战(10)——消息处理中的死循环相关的知识,希望对你有一定的参考价值。

参考技术A

消息中间件有很多种,在使用消息队列时,消费消息一般有两种模式,推送模式(Push)和拉取模式(Pull)。有些中间件会支持两种,例如RabbitMq;有些支持一种,例如Kafka只支持Pull。在项目中,应用了Aws SQS服务,只支持Pull模式,以此为出发点,谈谈消息处理中的死循环。

在code review时,发现同事写了如下的这段代码。基本逻辑没有问题,读数据-->处理数据-->ACK。请注意用到了Thread.sleep()在每次处理完成后,线程暂停100ms。他的解释是,如果不用sleep,会导致死CPU占用率100%。

他的这种解释正确吗?首先,read和ack过程,都是包含网络IO操作的。其次,process函数也有读库、写库的代码,同样包含网络IO操作。网络IO操作和CPU的运行速度相比,是非常慢的,即使没有sleep函数,网络IO过程中,CPU存在等待状况,不会出现CPU占用率100%的情况。另一方面,这种写法限定了单线程的处理能,因为每次最多可以从queue中读取10条数据,每秒最多能读取100条,考虑到处理各环节的时间损耗,处理能力更弱。

通过分析,做了第一版的修改。读到数据正常处理,读取不到数据线程暂停100ms,这样可以避免空队列造成大量的读。

事实证明,这样的修改是合适的,在压力测试过程中,同时启动8个线程,在2核心16G内存的服务器上,CUP使用率达到70%左右,单实例的处理能力1200/s,达到设计预期。测试结果和具体的业务逻辑有关,具体实践中,根据实际运行情况

首先弄明白ack的作用。通过read函数从队列中pull数据,数据并没有在queue中删除,而是设置了消息对其他消息在时效期内,其他线程不可见;超过时效期,消息就可以被其他线程可见。调用ack函数,则是从队列中删除消息,其他线程任何时间都不可见了。(这是SQS的实现,其他消息队列可能有些差异,但是ack的作用是一样的)。

上述代码中,如果prcess过程发生异常,会跳过ack函数,直接跳转到异常处理逻辑。结果就是消息无法被删除,其他线程或本线程在时效期过后,再次读到这条消息,但同样会处理失败,这样就陷入了同一个消息处理的死循环。上述代码的另一个问题,每次读取10条,其中一条失败导致所有的消息无法ack,这其中可能包含已经处理过的消息,导致消息的重复消费。

针对这两个个问题,有两个方面可以优化:

1、死信队列

在队列中存放太久的消息,被称为是死信,存放的时间是可以配置的,并可以转发到另外的队列,称之为死信队列。假设死信时间为30min,消息可见性的时效期配置为5s,一个失败的消息最多可能被处理360次,必然会拖累消息处理能力。

显然,仅仅通过死信队列,是无法优雅完美的解决问题的,还需要对代码进行再次优化。

2、代码优化

结合死信队列的作用,再来完善代码。逐条处理消息,处理出现异常,尝试把消息存库,然后调用ack。这样只有在process和save同时失败时,才会走到死信队列的逻辑,大大减少了系统的压力。

以上讲述的是pull模式的死循环问题,其实是非常明显的,因为看到了while关键字的存在。推送模式下,死循环隐藏的就有点深了。

push模式中,一般通过listener实现,以RabbitMQ为例,示例代码如下。

Spring 的RebbitMQ Template,隐藏了ack的过程,这样的代码逻辑上并没有问题。但正是这样简洁的一段代码,发生过一次严重的线上故障,已经过去了2年多,但至今仍然记忆犹新。

RebbitMQ Template在实现的过程中,要考虑消息被正确的ack,策略就是只有被正确处理的消息才能被ack。简化一下,代码逻辑如下:

虽然在自己写的代码中没有whilie,但是在Template中存在。如果在处理的过程中出现异常,消息不能被ack,rabbitMq会再次推送此消息,这样就造成了单条消息的处理死循环。更严重是,RabbitMQ并没有时效期的概念,失败的消息立即就会被推送回来。

两年前的那次线上故障是因为处理消息是发生空指针错误,消息不能被处理,消息不停地被推送到处理实例。单条消息的处理,并没有其他性能损失,导致CPU使用率100%,并且输出非常多的日志,每小时几十个G。而处理消息的服务实例,在服务集群中对外提供接口供其他服务调用,拖累其他服务的长时间等待,进而引发雪崩效应,导致整个集群的不可用。事后对日志的分析发现,在一小时内对单条消息的处理次数超过40万次,非常夸张。

这个问题的处理也非常简单,解决掉空指针异常,并且优化处理逻辑。在以后的工作中,时刻有了这个紧绷的弦,防止消息处理的死循环。

以上是关于架构实战(10)——消息处理中的死循环的主要内容,如果未能解决你的问题,请参考以下文章

Android开发人员必看的 Handler 消息处理机制(源码实战)

Android中的消息处理机制

Android中的Handler机制

腾讯资深架构师给你讲解 kafka的基本原理,带你实战实践

Kafka 业务架构及消息丢失处理方案

Android 消息处理机制