Python AVRO阅读器在解码kafka消息时返回AssertionError
Posted
技术标签:
【中文标题】Python AVRO阅读器在解码kafka消息时返回AssertionError【英文标题】:Python AVRO reader returns AssertionError when decoding kafka messages 【发布时间】:2020-02-16 18:21:09 【问题描述】:新手在玩 Kafka 和 AVRO。
我正在尝试使用kafka-python
、avro-python3
包和关注this answer 在 Python 3.7.3 中反序列化 AVRO 消息。
负责解码Kafka消息的函数是
def decode_message(msg_value, reader):
from io import BytesIO
from avro.io import BinaryDecoder
message_bytes = BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
其中reader
定义为avro.io.DatumReader
实例:
def create_reader(filename_path):
from avro.io import DatumReader
import avro.schema
schema = avro.schema.Parse(open(filename_path).read())
reader = DatumReader(schema)
return reader
不幸的是,它失败了。这是回溯:
<_io.BytesIO object at 0x7fab73fe5530>
<avro.io.BinaryDecoder object at 0x7fab74300090>
Traceback (most recent call last):
File "app.py", line 19, in <module>
kfk.read_messages(kafka_consumer, avro_reader)
File "/app/modules/consume_kafka.py", line 17, in read_messages
decoded_message = decode_message(msg_value, reader)
File "/app/modules/consume_kafka.py", line 50, in decode_message
event_dict = reader.read(decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 489, in read
return self.read_data(self.writer_schema, self.reader_schema, decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 534, in read_data
return self.read_record(writer_schema, reader_schema, decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 734, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 512, in read_data
return decoder.read_utf8()
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 257, in read_utf8
input_bytes = self.read_bytes()
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 249, in read_bytes
assert (nbytes >= 0), nbytes
AssertionError: -40
我能够阅读该消息,它看起来像
b'Obj\x01\x04\x14avro.codec\x08null\x16avro.schema\xbe\t"type":"record","name":"tracks","namespace":"integration","fields":["name":"name","type":"string","name":"data","type":["type":"record","name":"track_upload_verified","namespace":"integration.tracks","fields":["name":"track_id","type":"string","name":"audio_filename","type":"string","name":"track_type","type":"string"],"type":"record","name":"audio_processed","namespace":"integration.tracks","fields":["name":"track_id","type":"string","name":"audio_mp3_filename","type":"string","name":"waveform_samples","type":"type":"array","items":"int","name":"duration","type":"string"]]]\x00\xc4\x8ad\xceF\x9c\xef\x99\n#y7\x96\xba\xb4\x02\xe2\x01*track_upload_verified\x00H341aa6a3-5ecb-4ac0-8f27-bc2fe5abc9d4^tracks-audio/-1khgyI4kYfSf8hq2XiXZjg-1569510465\x08main\xc4\x8ad\xceF\x9c\xef\x99\n#y7\x96\xba\xb4'
这是我所期望的,即生咬。
当我使用 this tool 对其进行验证时,我对架构非常确定。
有人遇到过类似的问题吗?
【问题讨论】:
我建议使用 Confluent Schema Registry 和他们的 python 客户端来处理 Avro 【参考方案1】:看起来像 avro 文件中的损坏,但根据您的问题和输入很难理解。
尝试以下方法:
-
安装 aws_schema_registry
从 aws_schema_registry.avro 导入 AvroSchema
初始化 - 架构 = AvroSchema(my_schema)
现在使用来自 kafka 的字节事件:
结果 = schema.read(message)
【讨论】:
以上是关于Python AVRO阅读器在解码kafka消息时返回AssertionError的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Avro 二进制编码器对 Kafka 消息进行编码/解码?
Spark:使用 Spark Scala 从 Kafka 读取 Avro 消息