在 Python 中使用“SchemaRegistryClient”反序列化 AVRO 消息

Posted

技术标签:

【中文标题】在 Python 中使用“SchemaRegistryClient”反序列化 AVRO 消息【英文标题】:Using 'SchemaRegistryClient' to deserialize AVRO message in Python 【发布时间】:2020-11-22 06:45:59 【问题描述】:

我们正在尝试使用来自其他系统的 AVRO 消息。 当我使用以下代码将架构指定为文件 (.avsc) 时,我能够读取 AVRO 消息,

import avro.schema
from avro.io import DatumReader, BinaryDecoder
...
schema = avro.schema.Parse(open("schema.avsc", "rb").read())
...
bytes_reader = io.BytesIO(element) # element is the serialized message
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(schema)
rec = reader.read(decoder)

但是,我现在需要从架构注册表 URL 读取架构,

http://<IP>:<PORT>/subjects/<SUBJECT>/versions/<VERSION>/schema

我正在从传入消息自定义属性“模式”中提取 url。现在要从我使用以下代码的 url 获取架构,

def fetch_schema(IP, subject, version):
    sr = SchemaRegistryClient(IP)
    schema = sr.get_schema(subject, version=version).schema
    return schema

使用上面用于反序列化消息的相同代码,我现在得到以下错误

AttributeError: 'AvroSchema' object has no attribute 'type' 

上线,

rec = reader.read(decoder) 

我比较了从文件读取和从 URL 获取时的“模式”变量的类型,

from file, the schema type is : <class 'avro.schema.RecordSchema'>
from URL, the schema type is : <class 'schema_registry.client.schema.AvroSchema'>

它们是不同的,因此可能是问题所在。在这里寻找一些方向。谢谢!

【问题讨论】:

【参考方案1】:

今天我从avro.schema.RecordSchema 转换为schema_registry.client.schema.AvroSchema 时遇到了同样的问题。 一种可能的解决方案是转储到 JSON,然后使用 Avro 库对其进行解析。

import avro.schema
from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="localhost:8081")
test_table_schema = client.get_schema("table_0_schema").schema

avro_schema = avro.schema.parse(json.dumps(test_table_schema.schema.raw_schema))

reader = DatumReader(avro_schema)

警告:

在使用 KafkaAvro 时,还可能发生其他几个问题:

刷新非 avro 消息的主题。我在解码以 json 格式推送的 Avro 消息时浪费了很多时间,因为 Kafka 消费者有 auto_offset_reset='earliest' 设置。 使用 Confluent 版本的 Debezium 时,可能有 5 个字节专用于架构 ID。您的解码函数应如下所示:
def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    message_bytes.seek(5) # <-----
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

查看this答案了解更多信息。

【讨论】:

【参考方案2】:

看来您需要从 Schema Registry API 调用中获取 JSON 表示,然后您可以像以前一样使用avro.schema.Parse

话虽如此,你可以只使用 urllib 或 requests,而你不需要 SchemaRegistryClient

【讨论】:

以上是关于在 Python 中使用“SchemaRegistryClient”反序列化 AVRO 消息的主要内容,如果未能解决你的问题,请参考以下文章

在 python 中使用 soffice,Command 在终端中有效,但在 Python 子进程中无效

python 使用pymongo在python中使用MongoDB的示例

在 python 中使用命令行时出现语法错误

python 在python中使用全局变量

如何在 Python 3.x 和 Python 2.x 中使用 pip

在Python中使用Redis