具有逻辑类型的 Avro 模式不能与最新的 confluent-kafka 一起使用

Posted

技术标签:

【中文标题】具有逻辑类型的 Avro 模式不能与最新的 confluent-kafka 一起使用【英文标题】:Avro schema with logicalType can't be used with latest confluent-kafka 【发布时间】:2022-01-15 00:44:19 【问题描述】:

我们正在使用带有 avro 模式的 kafka,并且模式注册表设置为FULL 兼容性。我们的架构使用logicalType 字段,例如:


  "name": "MyRecord",
  "type": "record",
  "fields": [
    
      "name": "created_at",
      "type": [
        "null",
        
          "type": "long",
          "logicalType": "timestamp-millis"
        
      ],
      "default": null
    
  ]

这适用于我们正在使用的相当旧的confluent-kafka 版本,因为它依赖于avro-python3 1.8。但是,最近的confluent-kafka 依赖于avro-python3 1.10,消息序列化失败并出现TypeError: unhashable type: 'mappingproxy'

I've opened a PR to fix the issue 但它并没有引起太多关注。

假设它不会被合并,我还有什么其他选项可以升级到最近的confluent-kafka

我看到的唯一解决方案是摆脱 logicalType,但这将是不兼容的架构更改,因此我要么放弃 FULL 兼容性,要么使用绑定到不同架构的不同主题。

即使上述方法有效,我也必须手动将毫秒转换为时间戳,这对我们的代码库来说是一个相当大的变化。

【问题讨论】:

【参考方案1】:

您的架构不是有效的 Avro。 documentation 声明“当为类型为联合的记录字段指定默认值时,默认值的类型必须匹配联合的第一个元素”时间>。对于您的字段created_at,“null”是唯一有效的默认值,否则您必须将架构定义为

...
    
      "name": "created_at",
      "type": [
        
          "type": "long",
          "logicalType": "timestamp-millis"
        ,
        "null"
      ],
      "default": 1
    
...

这将默认为 1970 年 1 月 1 日(实际上是午夜后 1 毫秒),这可能不是您想要的。

【讨论】:

感谢您的关注,我编辑了问题。不幸的是,这还不足以解决我们的问题

以上是关于具有逻辑类型的 Avro 模式不能与最新的 confluent-kafka 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

BigQuery:加载具有日期列数据类型的 avro 文件,只要转换为时间戳

如何使用最新的 AVRO 模式文件更新配置单元表元数据

Kafka 的嵌套 Avro 类型是不是有最佳实践?

如何在Avro Union逻辑类型字段中指定转换器的默认值?

模式注册表中的递归 avro 模式类型

单一模式中相同类型的 Avro 多条记录