获取 Kafka 压缩消息大小

Posted

技术标签:

【中文标题】获取 Kafka 压缩消息大小【英文标题】:Get Kafka compressed message size 【发布时间】:2018-10-19 11:17:54 【问题描述】:

我想知道 kafka 中消息的压缩大小。

我使用 kafka 1.1.0 和 java kafka-connect 1.1.0 将消息从我的生产者发送到主题。

如果消息对于我的生产者来说太大,我会得到一个

序列化时消息为 xxx 字节,大于您使用 max.request.size 配置配置的最大请求大小。

将 max.request.size 设置为合适的值会导致来自代理的错误消息,因为 message.max.bytes 也必须在代理配置中相应地进行调整。不幸的是,错误消息不包括代理收到的消息的大小。我调整了 message.max.bytes。到目前为止一切顺利。

如果我在生产者端激活压缩,max.request.size 仍然必须与未压缩时的大小相同,因为不幸的是代码在压缩之前比较了未压缩消息的大小(请参阅https://issues.apache.org/jira/browse/KAFKA-4169)

但是通过压缩,我可以减少代理中的 message.max.bytes。问题是我无法确定此压缩消息的大小。有什么方法可以在发送消息之前的生产者代码中或稍后在日志文件中解决这个问题?

在我的压缩情况下,message.max.bytes 的默认值 1MB 就足够了,因此我不必更改默认配置。但我想知道我的压缩消息是低于 1MB 还是只有 0.99MB。在这种情况下,我可能会在生产中增加 message.max.bytes 以避免出现问题。

提前感谢您的支持。

【问题讨论】:

【参考方案1】:

您可以做的是使用压缩库,自己压缩消息,在发送前检查大小。例如,假设您使用的是 lz4 压缩,您可以使用 lz4-java lib 然后类似:

private static LZ4Compressor COMPRESS = LZ4Factory.fastestInstance().highCompressor();

String meMessageString      = "My Message that I am sending to kafka";
byte[] uncompressedBytes    = jsonRequest.getBytes();
long lz4compressedLength    = COMPRESSOR.compress(uncompressedBytes).length;

【讨论】:

【参考方案2】:

为了测试 snappy 压缩消息,您可以执行以下操作。

pip install python-snappy
python -m snappy -c input.json output.snappy

【讨论】:

以上是关于获取 Kafka 压缩消息大小的主要内容,如果未能解决你的问题,请参考以下文章

获取kafka消息处理时间

关于怎么获取kafka指定位置offset消息

无法从自动创建的 kafka 主题中获取消息

如何根据时间戳获取Kafka消息

如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]

Kafka API获取非compacted topic总消息数