在火花结构化流中反序列化 kafka avro 主题的 int 编码无效

Posted

技术标签:

【中文标题】在火花结构化流中反序列化 kafka avro 主题的 int 编码无效【英文标题】:Invalid int encoding on deserializing kafka avro topics in spark structured streaming 【发布时间】:2019-04-30 05:20:59 【问题描述】:

我正在尝试使用 spark 结构化流 (version-2.3.1) 处理来自 kafka 的流式 avro 数据,因此我尝试使用 this 示例进行反序列化。 它仅在主题 value 部分包含 StringType 时才有效,但在我的情况下,架构包含 long and integers,如下所示:

public static final String USER_SCHEMA = ""
        + "\"type\":\"record\","
        + "\"name\":\"variables\","
        + "\"fields\":["
        + "   \"name\":\"time\", \"type\":\"long\" ,"
        + "   \"name\":\"thnigId\", \"type\":\"string\" ,"
        + "   \"name\":\"controller\", \"type\":\"int\" ,"
        + "   \"name\":\"module\", \"type\":\"int\" "
        + "]";

所以它给出了一个例外

sparkSession.udf().register("deserialize", (byte[] data) -> 
GenericRecord record = recordInjection.invert(data).get(); //throws error at invert method.
return RowFactory.create(record.get("time"), record.get("thingId").toString(), record.get("controller"), record.get("module"));
    , DataTypes.createStructType(type.fields()));

Failed to invert: [B@22a45e7
Caused by java.io.IOException: Invalid int encoding.

因为我在架构 long and int 类型中有 time, controller and module

我猜这是字节数组byte[] data的某种编码和解码格式错误。

【问题讨论】:

【参考方案1】:

你看过这个https://issues.apache.org/jira/browse/AVRO-1650。它专门讨论了您可能遇到的问题。默认的 UTF-8 编码可能会导致编码/解码过程中的丢失。

如果您正在处理二进制编码的数据,我还建议您使用Base64 编码来保存/传输数据,因为它利用 ISO-8859-1,这是按照上面链接使用的正确编码。

【讨论】:

问题是我试图反序列化 confluent avro,它的元数据很长,而 avro 记录就是问题所在。【参考方案2】:

我也遇到过这种情况,我想也许你配置你的kafka value-deserializer是默认的String deserializer,你可以尝试将deserializer更改为org.apache.kafka.common.serialization.ByteArrayDeserializer。

这是我的解决方案。

希望能帮到你

【讨论】:

以上是关于在火花结构化流中反序列化 kafka avro 主题的 int 编码无效的主要内容,如果未能解决你的问题,请参考以下文章

使用 kafka lib 反序列化 PRIMITIVE AVRO KEY

Python AVRO阅读器在解码kafka消息时返回AssertionError

使用 Newtonsoft Json 从流中反序列化多个 json 对象

在Kafka中使用Avro编码消息:Producter篇

如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套的源数据?

[C#]如何使用Newton.Json从流中反序列化json数据