如何使用来自 Kafka 的 Python 解码/反序列化 Avro
Posted
技术标签:
【中文标题】如何使用来自 Kafka 的 Python 解码/反序列化 Avro【英文标题】:How to decode/deserialize Avro with Python from Kafka 【发布时间】:2017-11-08 12:28:06 【问题描述】:我正在从远程服务器接收 Python 中的 Kafka Avro 消息(使用 Confluent Kafka Python 库的使用者),这些消息表示带有 json 字典的点击流数据,其中包含用户代理、位置、url 等字段。这是一条消息看起来像:
b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\x84+@\x02\xec\xc09#J\x1fY@\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (Khtml, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.'
如何解码?我尝试了 bson 解码,但该字符串未被识别为 UTF-8,因为我猜它是一种特定的 Avro 编码。我找到了https://github.com/verisign/python-confluent-schemaregistry,但它只支持 Python 2.7。理想情况下,我希望使用 Python 3.5+ 和 MongoDB 来处理数据并将其存储为我当前的基础架构。
【问题讨论】:
【参考方案1】:我以为 Avro 库只是为了读取 Avro 文件,但它实际上解决了解码 Kafka 消息的问题,如下:我首先导入库并将模式文件作为参数,然后创建一个函数来解码消息到字典中,我可以在消费者循环中使用。
import io
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
c = Consumer()
c.subscribe(topic)
running = True
while running:
msg = c.poll()
if not msg.error():
msg_value = msg.value()
event_dict = decode(msg_value)
print(event_dict)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
【讨论】:
我在尝试导入 avroFile "/usr/lib64/python3.6/tokenize.py", line 27, in <module> ImportError: cannot import name 'open'
时收到此错误您知道可能出了什么问题吗?
io 是在哪里定义的?
刚刚添加了缺少的“import io”谢谢
而“open()”是Python内置函数,所以应该不用导入就可以调用
请注意,方法名称(至少使用 avro 1.10.1)是 avro.schema.parse() 而不是 Parse() :)【参考方案2】:
如果您使用 Confluent Schema Registry 并想反序列化 avro 消息,只需将 message_bytes.seek(5) 添加到 decode 函数中,因为 Confluent 在典型的 avro 格式数据之前添加了 5 个额外字节。
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
【讨论】:
谢谢你,我想我自己不会想到这个...... 谢谢!我被那个阻止了;-) 哇,这太疯狂了——但非常感谢你的修复【参考方案3】:如果您可以访问 Confluent 架构注册服务器,您还可以使用 Confluent 自己的AvroDeserializer
来避免弄乱他们神奇的 5 个字节:
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
def process_record_confluent(record: bytes, src: SchemaRegistryClient, schema: str):
deserializer = AvroDeserializer(schema_str=schema, schema_registry_client=src)
return deserializer(record, None) # returns dict
【讨论】:
需要注意的是confluent_kafka
不会在 Windows 上安装到 Python(我相信)
@bunkerdive 我已经让它在 WSL 2 上运行。有几个陷阱,但它对于开发目的来说已经足够好了。它不适用于生产用途。 confluent.io/blog/set-up-and-run-kafka-on-windows-and-wsl-2
如果我们没有 Confluent 模式注册服务器?以上是关于如何使用来自 Kafka 的 Python 解码/反序列化 Avro的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Avro 二进制编码器对 Kafka 消息进行编码/解码?
在 Python 中使用来自 Gravitee 的公钥解码 JWT 令牌时出现问题
即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]