使用 Kafka 处理大消息

Posted

技术标签:

【中文标题】使用 Kafka 处理大消息【英文标题】:Handling Large Messages with Kafka 【发布时间】:2019-08-04 17:45:25 【问题描述】:

如何在 Kafka 中处理超过 20MB 等的大消息

[2019-03-13 08:59:10,923] ERROR 向主题测试发送消息时出错,键:13 字节,值:11947696 字节,错误:(org.apache.kafka.clients.producer. internals.ErrorLoggingCallback) org.apache.kafka.common.errors.RecordTooLargeException:请求包含的消息大于服务器将接受的最大消息大小。

[2019-03-13 03:59:14,478] ERROR 向主题测试发送消息时出错,键:13 字节,值:11947696 字节,错误:(org.apache.kafka.clients.producer. internals.ErrorLoggingCallback) org.apache.kafka.common.errors.RecordTooLargeException: 序列化时消息为 11947797 字节,大于您使用 max.request.size 配置配置的最大请求大小。

【问题讨论】:

【参考方案1】:

我们需要设置以下配置

经纪人

replica.fetch.max.bytes:更改此属性将允许代理中的副本在集群内发送消息并确保消息被正确复制。如果这太小,那么消息将永远不会被复制,因此,消费者将永远看不到该消息,因为该消息永远不会被提交(完全复制)。

message.max.bytes:这是代理可以从生产者那里接收到的最大消息大小。

经纪人(主题)

ma​​x.message.bytes:Kafka 允许的最大记录批大小。如果增加了这个值并且有超过 0.10.2 的消费者,消费者的获取大小也必须增加,以便他们可以获取这么大的记录批次。在最新的消息格式版本中,为了提高效率,记录总是分组为批次。在以前的消息格式版本中,未压缩的记录不会分组,并且此限制仅适用于这种情况下的单个记录(默认为代理的 message.max.bytes)。

制片人

ma​​x.request.size:请求的最大大小(以字节为单位)。此设置将限制生产者在单个请求中发送的记录批次数,以避免发送大量请求。这也有效地限制了最大记录批量大小。请注意,服务器对记录批量大小有自己的上限,可能与此不同。

compression.type:设置为snappy,这将增加单个请求可以发送的数据总量,应该与更大的batch.size配对。

buffer.memory:如果启用压缩,缓冲区大小也应该增加。

batch.size:批量大小至少应为 10s KB,在 300kb 左右可以看到收益递减(远程客户端更少)。更大的批次也会产生更好的压缩比。

linger.ms: linger.ms 抢占任何设置在批量大小上的界限。增加此值以确保在较慢的生产时间不会发送较小的批次

消费者

fetch.message.max.bytes:这将确定消费者可以获取的最大消息大小。

ma​​x.partition.fetch.bytes:服务器将返回的每个分区的最大数据量。

【讨论】:

以上是关于使用 Kafka 处理大消息的主要内容,如果未能解决你的问题,请参考以下文章

专为实时而构建:使用Apache Kafka进行大数据消息传递 第2部分

专为实时而构建:使用Apache Kafka进行大数据消息传递,第1部分

kafka中处理超大消息的一些考虑

带你了解大数据消息流系统Kafka

大数据之Kafka

Kafka,Mq和Redis作为消息队列使用