Spring Kafka 生产者抛出 TimeoutExceptions

Posted

技术标签:

【中文标题】Spring Kafka 生产者抛出 TimeoutExceptions【英文标题】:Spring Kafka producers throwing TimeoutExceptions 【发布时间】:2018-11-16 10:46:27 【问题描述】:

问题

我在 Kubernetes 中有一个包含三个代理的 Kafka 设置,根据https://github.com/Yolean/kubernetes-kafka 的指南进行设置。从 Java 客户端生成消息时出现以下错误消息。

2018-06-06 11:15:44.103 ERROR 1 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='[...redacted...]':
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for topicname-0: 30001 ms has passed since last append

详细设置

侦听器设置为允许来自外部世界的 SSL 生产者/消费者:

advertised.host.name = null
advertised.listeners = OUTSIDE://kafka-0.mydomain.com:32400,PLAINTEXT://:9092
advertised.port = null
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE:SSL
listeners = OUTSIDE://:9094,PLAINTEXT://:9092
inter.broker.listener.name = PLAINTEXT
host.name =
port.name = 9092

OUTSIDE 侦听器正在侦听 kafka-0.mydomain.com、kafka-1.mydomain.com 等。纯文本侦听器正在侦听任何 IP,因为它们是 Kubernetes 的集群本地。

生产者设置:

kafka:
  bootstrap-servers: kafka.mydomain.com:9092
  properties:
    security.protocol: SSL
   producer:
    batch-size: 16384
    buffer-memory: 1048576 # 1MB
    retries: 1
    ssl:
      key-password: redacted
      keystore-location: file:/var/private/ssl/kafka.client.keystore.jks
      keystore-password: redacted
      truststore-location: file:/var/private/ssl/kafka.client.truststore.jks
      truststore-password: redacted

此外,我在代码中将linger.ms 设置为 100,这会强制消息在 100 毫秒内传输。延迟时间故意设置得很短,因为用例需要最小的延迟。

分析

在将代理移至 SSL 时开始出现错误。 在服务器端,一切都按预期运行,日志中没有错误,我可以使用 Kafka 客户端工具手动连接到代理。 这些错误间歇性地出现:有时它每秒发送 30 多条消息,有时它什么也不发送。它可能会像魅力一样工作几个小时,然后只是垃圾邮件超时一段时间。 客户端和服务器的时钟同步 (UTC)。 生产端和服务器端的 CPU 始终保持在 20% 左右。

会是什么?

【问题讨论】:

会不会是时钟偏差?当服务器/客户端时钟相差太多时,我看到了 SSL 错误。 @Hitobat 感谢您的回复。两个时钟在 UTC 时是同步的——我认为也不可能是这样,因为那时我希望它总是工作或永远工作。不幸的是,这些错误在没有明显原因的情况下出现和消失。 【参考方案1】:

此问题通常发生在生产者比代理快时,您的设置发生这种情况的原因似乎是 SSL 需要额外的 CPU,这可能会减慢代理的速度。但无论如何检查以下内容:

检查您是否以相同的速度生成消息,根据您所说的似乎您有峰值。 另一种可能性是集群中的其他 kafka 客户端(生产者或消费者)不一定使用相同的主题,由于代理过载(检查代理 cpu/网络)而导致这种情况发生。

为了最大限度地减少导致这种保留的任何原因,您应该将buffer-memory 增加到超过 32MB,认为 32MB 是默认值并且您将其设置得更低。越低,缓冲区越容易满,如果发生这种情况,它最多会阻塞max.block.ms,并且在request.timeout.ms之后请求将超时。

另一个你应该增加的参数是batch-size,这个参数是字节数,不是消息数。 linger.ms 也应该增加,如果这个生产者消息是在用户请求时间内创建的,不要增加太多,1-4 ms 是一个不错的选择。

batch.size 已满或需要比linger.ms 更长的时间才能获得比batch.size 更多的数据时,将发送消息。大批量在正常情况下会增加吞吐量,但如果 linger 太低,则无济于事,因为您将在有足够数据获取batch.size 之前发送。

还要重新检查生产者日志是否正确加载了属性。

【讨论】:

感谢您的回复。我正在制作的应用程序具有一些近乎实时的属性,这就是为什么我将批量大小和延迟毫秒设置得尽可能低。奇怪的是,生产者和服务器的 CPU 很少超过 20% 并且使用率相当稳定 - 似乎那里没有问题。来自生产者的网络访问是不间断的(每分钟发生一次 HTTPS 心跳),并且服务器在具有两个节点的 Google Cloud 中运行(该网络不太可能成为那里的瓶颈)。我将开始监测峰值,但我不相信它们正在发生。

以上是关于Spring Kafka 生产者抛出 TimeoutExceptions的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 和 Kafka,Producer 抛出异常,key='null'

spring kafka producer 生产者

Kafka生产者——结合spring开发

Spring Kafka - “ErrorHandler 抛出异常”并丢失了一些记录

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍

spring整合kafka项目生产和消费测试结果记录