具有解码器问题的 Kafka Avro Consumer

Posted

技术标签:

【中文标题】具有解码器问题的 Kafka Avro Consumer【英文标题】:Kafka Avro Consumer with Decoder issues 【发布时间】:2016-07-01 13:07:14 【问题描述】:

当我尝试使用我各自的架构对数据运行 Kafka Consumer with Avro 时,它返回错误 "AvroRuntimeException: Malformed data. Length is negative: -40" 。我看到其他人也有类似的问题converting byte array to json、Avro write and read 和Kafka Avro Binary *coder。我还引用了这个Consumer Group Example,它们都很有帮助,但是到目前为止对这个错误没有帮助.. 它一直工作到这部分代码(第 73 行)

解码器解码器 = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);

我尝试过其他解码器并打印出 byteArrayInputStream 变量的内容,该变量看起来像我认为您期望的序列化 avro 数据的外观(在消息中我可以看到架构和一些数据以及一些格式错误的数据)我有使用 .available() 方法打印出可用的字节数,该方法返回 594。我无法理解为什么会发生此错误。 Apache Nifi 用于从 hdfs 生成具有相同模式的 Kafka 流。我将不胜感激。

【问题讨论】:

【参考方案1】:

也许问题在于 Nifi 写入(编码)Avro 数据的方式与您的消费者应用程序读取(解码)数据的方式不匹配。

简而言之,Avro 的 API 提供了两种不同的序列化方法:

    用于创建正确的 Avro 文件:对数据记录进行编码,同时将 Avro 模式嵌入到一种前导码中(通过org.apache.avro.file.DataFileWriter/DataFileReader)。将架构嵌入 Avro 文件很有意义,因为 (a) 通常 Avro 文件的“有效负载”比嵌入的 Avro 架构大几个数量级,并且 (b) 然后您可以随意复制或移动这些文件并且仍然确保您可以再次阅读它们而无需咨询某人或某事。 仅对数据记录进行编码,即不嵌入架构(通过org.apache.avro.io.BinaryEncoder/BinaryDecoder;注意包名称的区别:此处为io 与以上file)。例如,在对写入 Kafka 主题的 Avro 编码消息时,这种方法通常受到青睐,因为与上面的变体 1 相比,您不会产生将 Avro 模式重新嵌入每条消息的开销,假设您的(非常合理的)策略是,对于同一个 Kafka 主题,消息使用相同的 Avro 模式进行格式化/编码。这是一个显着的优势,因为在流数据上下文中,动态数据记录通常比上述静态数据 Avro 文件小得多(通常在 100 字节和几百 KB 之间)(通常为数百或数千 MB);因此 Avro 模式的大小相对较大,因此您不想在将 2000 条数据记录写入 Kafka 时将其嵌入 2000 倍。缺点是您必须“以某种方式”跟踪 Avro 模式如何映射到 Kafka 主题——或者更准确地说,您必须以某种方式跟踪消息是使用哪个 Avro 模式进行编码的,而无需沿着直接嵌入模式的路径进行。好消息是tooling available in the Kafka ecosystem (Avro schema registry) 可以透明地执行此操作。因此,与变体 1 相比,变体 2 以牺牲便利性为代价提高了效率。

效果是编码的 Avro 数据的“有线格式”看起来会有所不同,具体取决于您使用上述 (1) 还是 (2)。

我对 Apache Nifi 不是很熟悉,但快速浏览一下源代码(例如 ConvertAvroToJSON.java)告诉我它正在使用变体 1,即它在 Avro 记录旁边嵌入了 Avro 模式。但是,您的使用者代码使用 DecoderFactory.get().binaryDecoder() 并因此使用变体 2(未嵌入架构)。

也许这解释了您遇到的错误?

【讨论】:

谢谢@miguno 就是这样!我在使用解码器到 DataFileReader 时摇摆不定,并进行了两行更改。 DatumReader datumReader = new SpecificDatumReader(schema); DataFileStream dataFileReader = new DataFileStream(inputStream, datumReader); 更正 * 我现在正在摇摆不定,因为我通过两行更改更改为 DataFileReader。你是对的 binaryDecoder 不是这项工作的正确选择。

以上是关于具有解码器问题的 Kafka Avro Consumer的主要内容,如果未能解决你的问题,请参考以下文章

Python AVRO阅读器在解码kafka消息时返回AssertionError

使用 Kafka Avro Console Consumer 时如何为特定的 Schema 注册表传递参数?

Kafka中使用Avro编码解码消息

如何使用来自 Kafka 的 Python 解码/反序列化 Avro

具有逻辑类型的 Avro 模式不能与最新的 confluent-kafka 一起使用

数据流模板“Pub/Sub Avro to Bigquery”无法解码