在 Python 中使用“SchemaRegistryClient”反序列化 AVRO 消息
Posted
技术标签:
【中文标题】在 Python 中使用“SchemaRegistryClient”反序列化 AVRO 消息【英文标题】:Using 'SchemaRegistryClient' to deserialize AVRO message in Python 【发布时间】:2020-11-22 06:45:59 【问题描述】:我们正在尝试使用来自其他系统的 AVRO 消息。 当我使用以下代码将架构指定为文件 (.avsc) 时,我能够读取 AVRO 消息,
import avro.schema
from avro.io import DatumReader, BinaryDecoder
...
schema = avro.schema.Parse(open("schema.avsc", "rb").read())
...
bytes_reader = io.BytesIO(element) # element is the serialized message
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(schema)
rec = reader.read(decoder)
但是,我现在需要从架构注册表 URL 读取架构,
http://<IP>:<PORT>/subjects/<SUBJECT>/versions/<VERSION>/schema
我正在从传入消息自定义属性“模式”中提取 url。现在要从我使用以下代码的 url 获取架构,
def fetch_schema(IP, subject, version):
sr = SchemaRegistryClient(IP)
schema = sr.get_schema(subject, version=version).schema
return schema
使用上面用于反序列化消息的相同代码,我现在得到以下错误
AttributeError: 'AvroSchema' object has no attribute 'type'
上线,
rec = reader.read(decoder)
我比较了从文件读取和从 URL 获取时的“模式”变量的类型,
from file, the schema type is : <class 'avro.schema.RecordSchema'>
from URL, the schema type is : <class 'schema_registry.client.schema.AvroSchema'>
它们是不同的,因此可能是问题所在。在这里寻找一些方向。谢谢!
【问题讨论】:
【参考方案1】:今天我从avro.schema.RecordSchema
转换为schema_registry.client.schema.AvroSchema
时遇到了同样的问题。
一种可能的解决方案是转储到 JSON,然后使用 Avro 库对其进行解析。
import avro.schema
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="localhost:8081")
test_table_schema = client.get_schema("table_0_schema").schema
avro_schema = avro.schema.parse(json.dumps(test_table_schema.schema.raw_schema))
reader = DatumReader(avro_schema)
警告:
在使用 Kafka 和 Avro 时,还可能发生其他几个问题:
刷新非 avro 消息的主题。我在解码以 json 格式推送的 Avro 消息时浪费了很多时间,因为 Kafka 消费者有auto_offset_reset='earliest'
设置。
使用 Confluent 版本的 Debezium 时,可能有 5 个字节专用于架构 ID。您的解码函数应如下所示:
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
message_bytes.seek(5) # <-----
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
查看this答案了解更多信息。
【讨论】:
【参考方案2】:看来您需要从 Schema Registry API 调用中获取 JSON 表示,然后您可以像以前一样使用avro.schema.Parse
。
话虽如此,你可以只使用 urllib 或 requests,而你不需要 SchemaRegistryClient
【讨论】:
以上是关于在 Python 中使用“SchemaRegistryClient”反序列化 AVRO 消息的主要内容,如果未能解决你的问题,请参考以下文章
在 python 中使用 soffice,Command 在终端中有效,但在 Python 子进程中无效
python 使用pymongo在python中使用MongoDB的示例