如何从 Debezium 创建的 avro 消息中获取字段?

Posted

技术标签:

【中文标题】如何从 Debezium 创建的 avro 消息中获取字段?【英文标题】:How to get field from avro message created by Debezium? 【发布时间】:2020-05-22 20:16:35 【问题描述】:

我想根据 ts_ms 时间过滤我的消息。问题是我无法从 avro 消息中获取 ts_ms。这是我精简的 avro .avsc 文件:


  "type": "record",
  "name": "Envelope",
  "namespace": "mysql.company.scores",
  "fields": [
    
      "name": "before",
      "type": [
        "null",
        
          "type": "record",
          "name": "Value",
          "fields": [
            <Some fields based on scores table>
          ],
          "connect.name": "mysql.company.scores.Value"
        
      ],
      "default": null
    ,
    
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    ,
    
      "name": "source",
      "type": 
        "type": "record",
        "name": "Source",
        "namespace": "io.debezium.connector.mysql",
        "fields": [
          
            "name": "version",
            "type": [
              "null",
              "string"
            ],
            "default": null
          ,
          
            "name": "connector",
            "type": [
              "null",
              "string"
            ],
            "default": null
          ,
          
            "name": "name",
            "type": "string"
          ,
          
            "name": "server_id",
            "type": "long"
          ,
          
            "name": "ts_sec",
            "type": "long"
          ,
          
            "name": "gtid",
            "type": [
              "null",
              "string"
            ],
            "default": null
          ,
          
            "name": "file",
            "type": "string"
          ,
          
            "name": "pos",
            "type": "long"
          ,
          
            "name": "row",
            "type": "int"
          ,
          
            "name": "snapshot",
            "type": [
              
                "type": "boolean",
                "connect.default": false
              ,
              "null"
            ],
            "default": false
          ,
          
            "name": "thread",
            "type": [
              "null",
              "long"
            ],
            "default": null
          ,
          
            "name": "db",
            "type": [
              "null",
              "string"
            ],
            "default": null
          ,
          
            "name": "table",
            "type": [
              "null",
              "string"
            ],
            "default": null
          ,
          
            "name": "query",
            "type": [
              "null",
              "string"
            ],
            "default": null
          
        ],
        "connect.name": "io.debezium.connector.mysql.Source"
      
    ,
    
      "name": "op",
      "type": "string"
    ,
    
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    
  ],
  "connect.name": "mysql.company.scores.Envelope"


我可以在之前或之后访问,但是当我可以使用 getTs_ms 进行以下方法时,我得到符号找不到方法:

private boolean isRecordNew(mysql.company.scores.Envelope value)
        return value.getTs_ms() > 1580988600000L;
    

这是我的 serde 类的相关部分:

public static Serde<mysql.company.scores.Envelope> getEnvelopeSerde() 
        SpecificAvroSerde<mysql.company.scores.Envelope> scoreSerde = new SpecificAvroSerde();
        scoreSerde.configure(
                Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                        schemaRegistryUrl), false);
        return scoreSerde;
    

我是否可以使用相同的 serde 类访问 ts_ms 字段,或者我应该更改它以使其包含在值中?

【问题讨论】:

为什么要创建自己的 serde?​​span> 为什么不能看一下生成的信封的Java类,看看字段或方法在哪里? @cricket_007 奇怪的是它曾经将它生成为 ts_ms 但现在它是 tsMs 【参考方案1】:

正如@cricket_007 在评论中提到的,我查看了生成的类,该字段名为getTsMs(),并通过使用此方法解决了问题。

【讨论】:

以上是关于如何从 Debezium 创建的 avro 消息中获取字段?的主要内容,如果未能解决你的问题,请参考以下文章

使用带有 Avro 序列化的 Debezium mongodb CDC 创建的模式太多

在Avro Debezium数据上创建一个基于Avro的KSQL流,会产生奇怪的模式。

Debezium 将 Avro 数据视为二进制

无法读取 Kafka 主题 avro 消息

Debezium 可能会产生无效的模式

如何以 JSON 格式转换 debezium 消息,以便可以将其加载到 Redshift