在火花结构化流中反序列化 kafka avro 主题的 int 编码无效
Posted
技术标签:
【中文标题】在火花结构化流中反序列化 kafka avro 主题的 int 编码无效【英文标题】:Invalid int encoding on deserializing kafka avro topics in spark structured streaming 【发布时间】:2019-04-30 05:20:59 【问题描述】:我正在尝试使用 spark 结构化流 (version-2.3.1) 处理来自 kafka 的流式 avro 数据,因此我尝试使用 this 示例进行反序列化。
它仅在主题 value
部分包含 StringType
时才有效,但在我的情况下,架构包含 long and integers
,如下所示:
public static final String USER_SCHEMA = ""
+ "\"type\":\"record\","
+ "\"name\":\"variables\","
+ "\"fields\":["
+ " \"name\":\"time\", \"type\":\"long\" ,"
+ " \"name\":\"thnigId\", \"type\":\"string\" ,"
+ " \"name\":\"controller\", \"type\":\"int\" ,"
+ " \"name\":\"module\", \"type\":\"int\" "
+ "]";
所以它给出了一个例外
sparkSession.udf().register("deserialize", (byte[] data) ->
GenericRecord record = recordInjection.invert(data).get(); //throws error at invert method.
return RowFactory.create(record.get("time"), record.get("thingId").toString(), record.get("controller"), record.get("module"));
, DataTypes.createStructType(type.fields()));
说
Failed to invert: [B@22a45e7
Caused by java.io.IOException: Invalid int encoding.
因为我在架构 long and int
类型中有 time, controller and module
。
我猜这是字节数组byte[] data
的某种编码和解码格式错误。
【问题讨论】:
【参考方案1】:你看过这个https://issues.apache.org/jira/browse/AVRO-1650。它专门讨论了您可能遇到的问题。默认的 UTF-8 编码可能会导致编码/解码过程中的丢失。
如果您正在处理二进制编码的数据,我还建议您使用Base64 编码来保存/传输数据,因为它利用 ISO-8859-1,这是按照上面链接使用的正确编码。
【讨论】:
问题是我试图反序列化 confluent avro,它的元数据很长,而 avro 记录就是问题所在。【参考方案2】:我也遇到过这种情况,我想也许你配置你的kafka value-deserializer是默认的String deserializer,你可以尝试将deserializer更改为org.apache.kafka.common.serialization.ByteArrayDeserializer。
这是我的解决方案。
希望能帮到你
【讨论】:
以上是关于在火花结构化流中反序列化 kafka avro 主题的 int 编码无效的主要内容,如果未能解决你的问题,请参考以下文章
使用 kafka lib 反序列化 PRIMITIVE AVRO KEY
Python AVRO阅读器在解码kafka消息时返回AssertionError
使用 Newtonsoft Json 从流中反序列化多个 json 对象