如何使用来自 Kafka 的 Python 解码/反序列化 Avro

Posted

技术标签:

【中文标题】如何使用来自 Kafka 的 Python 解码/反序列化 Avro【英文标题】:How to decode/deserialize Avro with Python from Kafka 【发布时间】:2017-11-08 12:28:06 【问题描述】:

我正在从远程服务器接收 Python 中的 Kafka Avro 消息(使用 Confluent Kafka Python 库的使用者),这些消息表示带有 json 字典的点击流数据,其中包含用户代理、位置、url 等字段。这是一条消息看起来像:

b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\x84+@\x02\xec\xc09#J\x1fY@\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (Khtml, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.'

如何解码?我尝试了 bson 解码,但该字符串未被识别为 UTF-8,因为我猜它是一种特定的 Avro 编码。我找到了https://github.com/verisign/python-confluent-schemaregistry,但它只支持 Python 2.7。理想情况下,我希望使用 Python 3.5+ 和 MongoDB 来处理数据并将其存储为我当前的基础架构。

【问题讨论】:

【参考方案1】:

我以为 Avro 库只是为了读取 Avro 文件,但它实际上解决了解码 Kafka 消息的问题,如下:我首先导入库并将模式文件作为参数,然后创建一个函数来解码消息到字典中,我可以在消费者循环中使用。

import io

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False

【讨论】:

我在尝试导入 avro File "/usr/lib64/python3.6/tokenize.py", line 27, in <module> ImportError: cannot import name 'open' 时收到此错误您知道可能出了什么问题吗? io 是在哪里定义的? 刚刚添加了缺少的“import io”谢谢 而“open()”是Python内置函数,所以应该不用导入就可以调用 请注意,方法名称(至少使用 avro 1.10.1)是 avro.schema.parse() 而不是 Parse() :)【参考方案2】:

如果您使用 Confluent Schema Registry 并想反序列化 avro 消息,只需将 message_bytes.seek(5) 添加到 decode 函数中,因为 Confluent 在典型的 avro 格式数据之前添加了 5 个额外字节。

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    message_bytes.seek(5)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

【讨论】:

谢谢你,我想我自己不会想到这个...... 谢谢!我被那个阻止了;-) 哇,这太疯狂了——但非常感谢你的修复【参考方案3】:

如果您可以访问 Confluent 架构注册服务器,您还可以使用 Confluent 自己的AvroDeserializer 来避免弄乱他们神奇的 5 个字节:

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

def process_record_confluent(record: bytes, src: SchemaRegistryClient, schema: str):
    deserializer = AvroDeserializer(schema_str=schema, schema_registry_client=src)
    return deserializer(record, None) # returns dict

【讨论】:

需要注意的是confluent_kafka 不会在 Windows 上安装到 Python(我相信) @bunkerdive 我已经让它在 WSL 2 上运行。有几个陷阱,但它对于开发目的来说已经足够好了。它不适用于生产用途。 confluent.io/blog/set-up-and-run-kafka-on-windows-and-wsl-2 如果我们没有 Confluent 模式注册服务器?

以上是关于如何使用来自 Kafka 的 Python 解码/反序列化 Avro的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Avro 二进制编码器对 Kafka 消息进行编码/解码?

如何使用 python 解码 SSL 证书?

在 Python 中使用来自 Gravitee 的公钥解码 JWT 令牌时出现问题

即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]

如何在 python 中扩展 Kafka 消费者?

在 Python 3.6 中解码来自 API 的文本响应