如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套的源数据?

Posted

技术标签:

【中文标题】如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套的源数据?【英文标题】:How do you handle nested source data with AVRO serialization in Apache Kafka? 【发布时间】:2021-11-15 23:39:20 【问题描述】:

我的目标是从 HTTP 源获取 JSON 数据并使用 AVRO 序列化将其存储在 Kafka 主题中。

使用 Kafka Connect 和 HTTP source connector 以及一堆 SMT,我设法创建了一个 Connect 数据结构,当使用 StringConverter 写入主题时如下所示:

Structbase=stations,cod=200,coord=Structlat=54.0,lon=9.0,dt=1632150605

因此,JSON 被成功解析为 STRUCT,我可以使用 SMT 操作单个元素。接下来,我在 Confluent Schema Registry 中创建了一个具有相应模式的新主题,并将连接器的值转换器切换到 Confluent AVRO 转换器,并使用"value.converter": "io.confluent.connect.avro.AvroConverter"

我收到一条错误消息,而不是预期的序列化:

org.apache.kafka.common.errors.SerializationException:序列化 Avro 消息时出错 引起:org.apache.avro.SchemaParseException:无法重新定义:io.confluent.connect.avro.ConnectDefault

只要我使用 ReplaceField 删除嵌套的 STRUCT 或使用 Flatten 简化结构,AVRO 序列化就像一个魅力。所以看起来转换器无法处理嵌套结构。

当您有嵌套元素并希望它们被序列化而不是将 JSON 存储为字符串并尝试在消费者或其他地方处理对象创建时,正确的方法是什么?这在 Kafka Connect 中是否可行?

【问题讨论】:

Avro 可以很好地处理嵌套记录。您遇到的问题是反序列化器无法处理具有不同字段的重复/重复命名空间记录...有一些“设置架构元数据”转换,您可能想查看 这会将异常转换为org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema。所以看起来这解决了最初的问题,但我仍然不完全在那里。它在消息中说明的模式与模式注册表中的模式相同,字符对字符。它还寻找什么来匹配架构? 按 id 检索?模式文本不应该太重要,但我个人并没有使用我提到的转换。此外,该异常似乎在反序列化器之前,而不是像您的其他错误一样 只能按名称和版本检索,但只有在您在架构定义中包含附加条目connect.nameconnect.version 时才有效。我在任何地方都没有看到记录的东西。不幸的是,我有点回到原点。内部 STRUCT coord 仍然被 io.confluent.kafka.serializers.AbstractKafkaAvroSerializer 解析为 io.confluent.connect.avro.ConnectDefault,所以只要我添加第二个不同类型的内部 STRUCT,我就会得到原始错误。因此,我需要能够为每个嵌套元素设置元数据......但是如何? 【参考方案1】:

可以通过不同的方式从 JSON 字符串创建 STRUCT 元素。最初,使用 SMT ExpandJson 是为了简单。但是,它没有创建足够命名的 STRUCT,因为它没有可供使用的模式。这就是导致初始错误消息的原因,因为 AVRO 序列化程序对这些 STRUCT 使用泛型类 io.confluent.connect.avro.ConnectDefault,如果存在多个 STRUCT,则会出现歧义,从而引发异常。

另一个看似相同的 SMT 是 Json Schema,它有一个记录在案的 FromJson 转换。它确实接受模式,从而解决了 ExpandJson 将嵌套元素解析为泛型类型的问题。但是,接受的是 JSON 模式,并且通过将单词“properties”作为命名空间并复制字段名称来映射到 AVRO 全名。在此示例中,您最终将使用 properties.coord 作为内部元素的全名。

例如,当将以下 JSON Schema 传递给 SMT 时:


  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": 
    "coord": 
      "type": "object",
      "properties": 
        "lon": 
          "type": "number"
        ,
        "lat": 
          "type": "number"
        
      ,
      "required": [
        "lon",
        "lat"
      ]
    ,
    ...

它产生的 AVRO 模式(并因此在模式注册表中查找)是:


    "type": "record",
    "fields": [
        ...
        
            "name": "coord",
            "type": 
                "type": "record",
                "name": "coord",
                "namespace": "properties",
                "fields": [
                    
                        "name": "lat",
                        "type": "double"
                    ,
                    
                        "name": "lon",
                        "type": "double"
                    
                ],
                "connect.name": "properties.coord"
            
        ,
    ...

理论上,如果您在第二级有另一个带有coord 元素的架构,它将获得相同的全名,但由于这些不是架构注册表中需要引用的单独条目,这不会导致碰撞。无法从 JSON Schema 控制 AVRO 记录的命名空间有点遗憾,因为感觉就像你就在那里,但我无法深入挖掘以提供解决方案。

建议的 SMT SetSchemaMetadata(请参阅问题的第一个回复)在此过程中可能很有用,但 it's documentation 与 AVRO 命名约定有些冲突,因为它在示例中显示了 order-value。它将尝试查找包含以此名称作为根元素的 AVRO 记录的模式,并且由于“-”是 AVRO 名称中的非法字符,因此您会收到错误消息。但是,如果您使用正确的根元素名称,SMT 会做一些非常有用的事情:它的 RestService 类会查询架构注册表以查找匹配的架构,但会失败并显示一条消息,打印出需要被创建,所以你不必记住所有的转换规则。

因此,原始问题的答案是:是的,可以使用 Kafka Connect 来完成。如果您这样做,这也是最好的选择

不想编写自己的生产者/连接器 希望以类型化的方式存储 JSON Blob,而不是在遇到初始主题后对其进行转换

如果数据摄取后转换是一个选项,de-, re- and serialization capabilities of ksqlDB 似乎非常强大。

【讨论】:

以上是关于如何在 Apache Kafka 中使用 AVRO 序列化处理嵌套的源数据?的主要内容,如果未能解决你的问题,请参考以下文章

带有 Avro 和 Schema Repo 的 Apache Kafka - 模式 ID 在消息中的啥位置?

使用 Apache Beam 反序列化 Kafka AVRO 消息

Apache Camel Kafka 连接器:以 Avro 格式写入 GCS

Spark sql怎么使用Kafka Avro序列化器

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码

使用 Avro 序列化器将 Spark Structured Streaming 数据发送到 Confluent Kafka