Kafka 生产者 TimeoutException: Expiring 1 record(s)

Posted

技术标签:

【中文标题】Kafka 生产者 TimeoutException: Expiring 1 record(s)【英文标题】:Kafka producer TimeoutException: Expiring 1 record(s) 【发布时间】:2018-03-20 20:35:23 【问题描述】:

我正在使用带有 Spring-boot 的 Kafka:

Kafka Producer 类

@Service
public class MyKafkaProducer 

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception 
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() 
            @Override
            public void onSuccess(SendResult<String, String> result) 
                LOGGER.debug("sent message='' with offset=", message, result.getRecordMetadata().offset());
            

            @Override
            public void onFailure(Throwable ex) 
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            
        );
    

Kafka-配置:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093

假设我在主题 my-test-topic 中发送了 10 次 1000 条消息。

10 次中有 8 次我成功地在我的消费者中获得了所有消息,但有时我得到以下 错误

2017-10-05 07:24:11, [ERROR] [my-service - LoggingProducerListener - onError:76] Exception thrown when sending a message with key='null' and payload='"deviceType":"X","deviceKeys":["apiKey":"X-X-o"],"devices...' to topic my-test-topic

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-test-topic-4 due to 30024 ms has passed since batch creation plus linger time

【问题讨论】:

您描述的这个错误是来自生产者还是消费者? 生产者收到此错误 所以,对于这样一个“低”request.timeout.ms,您的批次太慢了。试着把batch-size 调低一点 30秒还不够吗?(我是Kafka新手,请多包涵) 我不知道,但根据您的错误,您确实超过了 30 秒:due to 30024 ms has passed 【参考方案1】:

有3种可能:

    增加request.timeout.ms - 这是 Kafka 将等待整个批次在缓冲区中准备就绪的时间。因此,在您的情况下,如果缓冲区中的消息少于 100 000 条,则会发生超时。更多信息在这里:https://***.com/a/34794261/2707179 减少batch-size - 与上一点相关,它将更频繁地发送批次,但它们包含的消息会更少。 根据消息大小,您的网络可能无法赶上高负载?检查您的吞吐量是否不是瓶颈。

【讨论】:

自从我在 Kafka 上启用 SSL 后,我遇到了与 OP 相同的问题,并且请注意,和我一样,他设置了 linger.ms。根据文档,即使批次未满,也会在此延迟时间之后发送批次,因此即使批次大小很大,它也不应该超时。 @michalbrz 阅读理解这两篇文章后:1)***.com/a/34794261/4038460和2)cloudera.com/documentation/kafka/latest/topics/…。我觉得我们应该增加批量大小以避免超时。如果我们增加batch-size -> 批次数量将减少 -> 请求数量将减少 -> 发送记录所需的时间将减少 -> 超时不会经常发生【参考方案2】:

    错误中的第一条线索是30024 ms has passed - 配置spring.kafka.producer.request.timeout.ms=30000 是相关的。这 30 秒的等待是为了填满 Producer 端的缓冲区。

    当消息发布时,它会在 Producer 端进行缓冲,并等待 30 秒(参见上面的配置)填满。 spring.kafka.producer.batch-size=100000 表示 100KB,因此如果消息摄取负载较低,并且缓冲区在 30 秒内没有填满 100KB 的更多消息,您会收到此消息。

    spring.kafka.producer.linger.ms=10 用于摄取负载较高且生产者希望限制对 Kafka 代理的 send() 调用。这是在批处理准备好后(即缓冲区填充到批处理大小为 100KB 之后)生产者在向代理发送消息之前将等待的持续时间。

解决方案:

增加linger.ms 以在批处理准备好后更长时间地保留消息。如果需要更多时间来填充批次,请增加 request.timeout.ms。 另一种方法:减少batch-size,或增加request.timeout.ms,或两者兼而有之。

【讨论】:

【参考方案3】:

我通过正确寻址主机 spring.kafka.bootstrap-servers 及其 DNS 解决了这个问题。即使网络解析了 IP 地址,它似乎也需要 DNS。

【讨论】:

以上是关于Kafka 生产者 TimeoutException: Expiring 1 record(s)的主要内容,如果未能解决你的问题,请参考以下文章

如何保证kafka生产者发送消息的可靠性

kafka----;kafka生产者

Kafka生产者

Kafka 学习kafka 生产者幂等性

Kafka 系列—— Kafka 生产者详解

Kafka02--Kafka生产者简要原理