Apache Kafka 客户端何时抛出“批次过期”异常?
Posted
技术标签:
【中文标题】Apache Kafka 客户端何时抛出“批次过期”异常?【英文标题】:When does the Apache Kafka client throw a "Batch Expired" exception? 【发布时间】:2016-04-20 01:18:54 【问题描述】:使用 Apache Kafka Java 客户端 (0.9),我尝试使用 Kafka Producer class 向代理发送一长串记录。
异步send method 立即返回一段时间,然后开始在每个调用上阻塞一小段时间。大约 30 秒后,客户端开始抛出异常 (TimeoutException),并显示消息“批次过期”。
什么情况会导致这个异常被抛出?
【问题讨论】:
【参考方案1】:此异常表明您正在以比发送记录更快的速度排队记录。
当您调用send 方法时,ProducerRecord 将存储在内部缓冲区中以发送给代理。该方法在ProducerRecord 被缓冲后立即返回,无论它是否已发送。
记录被分组为批次以发送到代理,以减少每条消息的传输偷听并提高吞吐量。
将记录添加到批次后,发送该批次有一个时间限制,以确保它已在指定的时间内发送。这由 Producer 配置参数 request.timeout.ms 控制,默认为 30 秒。
如果批处理的排队时间超过了超时限制,则会抛出异常。该批次中的记录将从发送队列中删除。
使用配置参数增加超时限制,将允许客户端在过期之前将批次排队更长的时间。
【讨论】:
我已经在下面回复了您的评论,如果您有任何建议,请告诉我。 我想知道将batch.size
设置为 0(或介于 1 和标准值之间的值)实际上是否可以更好地解决问题?
嗨@JamesThomas,“表示您正在以比发送记录更快的速度排队记录”,如果我根本不想排队怎么办?我们的生产环境中会有很多流量,我们希望数据一到就发送。我们根本不想看到这个过期。我们已将 linger.ms 设置为默认值,但仍然遇到此问题。你说增加 request.timeout.ms 会增加批处理跨度。
你好@AnilKumar。在我的问题中,我发送数据的速度比我的网络接口的上传速度快。增加超时允许生产者有时间发送所有数据。如果你减少 linger 和 batch size 参数,你可以减少人为的延迟,但你仍然不能比网络更快。每条 Kafka 消息都有开销,因此拥有更大的批量大小和更长的时间将消息分组在一起可能更有意义。这里有更多信息可以解释:ingest.tips/2015/07/19/…【参考方案2】:
控制发送到代理之前的时间的参数是linger.ms
。其默认值为 0(无延迟)。
【讨论】:
从最初的问题中可能不清楚我的示例中发生了什么,我现在试图提供更多细节以使其更清楚。有关完整信息,请参阅下面的评论。我排队记录的速度超过了上传带宽,这是一个问题。【参考方案3】:我在完全不同的上下文中遇到了这个异常。
我已经设置了一个由 Zookeeper 虚拟机、代理虚拟机和生产者/消费者虚拟机组成的迷你集群。我在服务器 (9092) 和 zookeeper (2181) 上打开了所有必要的端口,然后尝试将消息从消费者/发布者 vm 发布到代理。我得到了 OP 提到的异常,但是由于到目前为止我只发布了一条消息(或者至少我尝试过),因此解决方案不能是增加超时或批量大小。所以我搜索并发现这个邮件列表描述了我在尝试从消费者/生产者 vm (ClosedChannelException) 中消费消息时遇到的类似问题:http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config 此邮件列表中的最后一篇文章实际上描述了如何解决问题。
长话短说,如果您同时遇到ChannelClosedException
和Batch Expired
异常,您可能必须在server.config
文件中将此行更改为以下内容并重新启动代理:
advertised.host.name=<broker public IP address>
如果没有设置,它会回退到 host.name
属性(可能两者都没有设置),然后回退到 InetAddress
Java 类的规范主机名,最后不是当然是正确的,因此会混淆远程节点。
【讨论】:
listeners=PLAINTEXT://domain_name:9092 port=9092 host.name=localhost advertised.host.name=domain_name.i 可以在本地发送消息,但是当我让 kafka 服务器上线时!我得到了这个异常“org.apache.kafka.common.errors.TimeoutException: Batch Expired java.util.concurrent.ExecutionException”。我哪里错了 配置文件在<kafka_dir>/config/server.properties
下0.10.2.1
如果其他人在看。【参考方案4】:
当您创建消费者时,将 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 true。
【讨论】:
与生产者抛出异常无关。【参考方案5】:我正在使用 Kafka Java 客户端版本 0.11.0.0。我也开始看到相同的模式无法始终如一地生成大消息。它通过了一些消息,而对其他一些消息则失败了。 (虽然通过和失败的消息大小相同)。就我而言,每条消息的大小约为 60KB,远高于 Kafka 的默认 batch.size
16kB,我的 linger.ms
也设置为默认值 0。由于 Producer 客户端在接收到来自服务器的成功响应之前超时,因此引发了此错误。基本上,在我的代码中,此调用超时:kafkaProd.send(pr).get()
。为了解决这个问题,我不得不将 Producer 客户端的默认 request.timeout.ms
增加到 60000
【讨论】:
延长超时时间可能会有所帮助,但看起来我们只是在努力解决问题,而不是解决根本原因。 面临类似问题。我曾尝试将超时设置为 120000,但如果负载在 120000 毫秒后存在,那么生产者会抛出批次过期。【参考方案6】:在 docker-compose 中运行 Kafka 时遇到了类似的问题。 我的 docker-compose.yml 设置为
KAFKA_ADVERTISED_HOST_NAME: kafka
ports:
- 9092:9092
但是当我尝试从外部码头用骆驼发送消息时
to("kafka:test?brokers=localhost:9092")
我遇到了 TimeoutException。我通过添加解决了它
127.0.0.1 kafka
到 Windows\System32\drivers\etc\hosts 文件,然后将我的骆驼网址更改为
to("kafka:test?brokers=kafka:9092")
【讨论】:
实际上我的问题是由于我的主题设置不正确造成的。从 docker 中清除所有持久数据并重新开始,它工作正常。 谢谢@Rory G!我以为我要疯了。一切正常,然后停止工作。我在这个问题上追了一段时间,然后听从你的建议,在重新启动一切之前清理我的容器和存储,它解决了我的问题。远距离的动作很诡异,我不希望每次都成为问题。 仅仅维护 Kafka 周围的东西就开始吓到我了。跑一两个月后随机休息(貌似是消费者驱动)。不喜欢。【参考方案7】:我解决了。
我的Kafka部署在一个Docker容器中,容器的网络模式是桥接,主机和容器使用端口映射,我把Kafka服务器的默认端口改为9102。
server.properties 中解决问题的配置项是这两个: 听众 广告.listeners
我尝试了几种组合:
成功:
listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102
服务器无法启动:
listeners=PLAINTEXT://192.168.0.136:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102
超时错误:
listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://:9102
【讨论】:
以上是关于Apache Kafka 客户端何时抛出“批次过期”异常?的主要内容,如果未能解决你的问题,请参考以下文章
kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表