如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套的源数据?
Posted
技术标签:
【中文标题】如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套的源数据?【英文标题】:How do you handle nested source data with AVRO serialization in Apache Kafka? 【发布时间】:2021-11-15 23:39:20 【问题描述】:我的目标是从 HTTP 源获取 JSON 数据并使用 AVRO 序列化将其存储在 Kafka 主题中。
使用 Kafka Connect 和 HTTP source connector 以及一堆 SMT,我设法创建了一个 Connect 数据结构,当使用 StringConverter 写入主题时如下所示:
Structbase=stations,cod=200,coord=Structlat=54.0,lon=9.0,dt=1632150605
因此,JSON 被成功解析为 STRUCT,我可以使用 SMT 操作单个元素。接下来,我在 Confluent Schema Registry 中创建了一个具有相应模式的新主题,并将连接器的值转换器切换到 Confluent AVRO 转换器,并使用"value.converter": "io.confluent.connect.avro.AvroConverter"
。
我收到一条错误消息,而不是预期的序列化:
org.apache.kafka.common.errors.SerializationException:序列化 Avro 消息时出错 引起:org.apache.avro.SchemaParseException:无法重新定义:io.confluent.connect.avro.ConnectDefault
只要我使用 ReplaceField 删除嵌套的 STRUCT 或使用 Flatten 简化结构,AVRO 序列化就像一个魅力。所以看起来转换器无法处理嵌套结构。
当您有嵌套元素并希望它们被序列化而不是将 JSON 存储为字符串并尝试在消费者或其他地方处理对象创建时,正确的方法是什么?这在 Kafka Connect 中是否可行?
【问题讨论】:
Avro 可以很好地处理嵌套记录。您遇到的问题是反序列化器无法处理具有不同字段的重复/重复命名空间记录...有一些“设置架构元数据”转换,您可能想查看 这会将异常转换为org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema
。所以看起来这解决了最初的问题,但我仍然不完全在那里。它在消息中说明的模式与模式注册表中的模式相同,字符对字符。它还寻找什么来匹配架构?
按 id 检索?模式文本不应该太重要,但我个人并没有使用我提到的转换。此外,该异常似乎在反序列化器之前,而不是像您的其他错误一样
只能按名称和版本检索,但只有在您在架构定义中包含附加条目connect.name
和connect.version
时才有效。我在任何地方都没有看到记录的东西。不幸的是,我有点回到原点。内部 STRUCT coord
仍然被 io.confluent.kafka.serializers.AbstractKafkaAvroSerializer
解析为 io.confluent.connect.avro.ConnectDefault
,所以只要我添加第二个不同类型的内部 STRUCT,我就会得到原始错误。因此,我需要能够为每个嵌套元素设置元数据......但是如何?
【参考方案1】:
可以通过不同的方式从 JSON 字符串创建 STRUCT 元素。最初,使用 SMT ExpandJson 是为了简单。但是,它没有创建足够命名的 STRUCT,因为它没有可供使用的模式。这就是导致初始错误消息的原因,因为 AVRO 序列化程序对这些 STRUCT 使用泛型类 io.confluent.connect.avro.ConnectDefault
,如果存在多个 STRUCT,则会出现歧义,从而引发异常。
另一个看似相同的 SMT 是 Json Schema,它有一个记录在案的 FromJson 转换。它确实接受模式,从而解决了 ExpandJson 将嵌套元素解析为泛型类型的问题。但是,接受的是 JSON 模式,并且通过将单词“properties”作为命名空间并复制字段名称来映射到 AVRO 全名。在此示例中,您最终将使用 properties.coord
作为内部元素的全名。
例如,当将以下 JSON Schema 传递给 SMT 时:
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties":
"coord":
"type": "object",
"properties":
"lon":
"type": "number"
,
"lat":
"type": "number"
,
"required": [
"lon",
"lat"
]
,
...
它产生的 AVRO 模式(并因此在模式注册表中查找)是:
"type": "record",
"fields": [
...
"name": "coord",
"type":
"type": "record",
"name": "coord",
"namespace": "properties",
"fields": [
"name": "lat",
"type": "double"
,
"name": "lon",
"type": "double"
],
"connect.name": "properties.coord"
,
...
理论上,如果您在第二级有另一个带有coord
元素的架构,它将获得相同的全名,但由于这些不是架构注册表中需要引用的单独条目,这不会导致碰撞。无法从 JSON Schema 控制 AVRO 记录的命名空间有点遗憾,因为感觉就像你就在那里,但我无法深入挖掘以提供解决方案。
建议的 SMT SetSchemaMetadata(请参阅问题的第一个回复)在此过程中可能很有用,但 it's documentation 与 AVRO 命名约定有些冲突,因为它在示例中显示了 order-value
。它将尝试查找包含以此名称作为根元素的 AVRO 记录的模式,并且由于“-”是 AVRO 名称中的非法字符,因此您会收到错误消息。但是,如果您使用正确的根元素名称,SMT 会做一些非常有用的事情:它的 RestService
类会查询架构注册表以查找匹配的架构,但会失败并显示一条消息,打印出需要被创建,所以你不必记住所有的转换规则。
因此,原始问题的答案是:是的,可以使用 Kafka Connect 来完成。如果您这样做,这也是最好的选择
不想编写自己的生产者/连接器 希望以类型化的方式存储 JSON Blob,而不是在遇到初始主题后对其进行转换如果数据摄取后转换是一个选项,de-, re- and serialization capabilities of ksqlDB 似乎非常强大。
【讨论】:
以上是关于如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套的源数据?的主要内容,如果未能解决你的问题,请参考以下文章
带有 Avro 和 Schema Repo 的 Apache Kafka - 模式 ID 在消息中的啥位置?
使用 Apache Beam 反序列化 Kafka AVRO 消息
Apache Camel Kafka 连接器:以 Avro 格式写入 GCS
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码
使用 Avro 序列化器将 Spark Structured Streaming 数据发送到 Confluent Kafka