用于从流中读取多个 protobuf 消息的 python 示例
Posted
技术标签:
【中文标题】用于从流中读取多个 protobuf 消息的 python 示例【英文标题】:python example for reading multiple protobuf messages from a stream 【发布时间】:2012-07-14 03:26:18 【问题描述】:我正在处理来自 spinn3r 的数据,它由多个不同的 protobuf 消息序列化成一个字节流:
http://code.google.com/p/spinn3r-client/wiki/Protostream
“protostream 是协议缓冲区消息的流,根据 Google 协议缓冲区规范在线上编码为长度前缀的 varint。流包含三个部分:标头、有效负载和尾部标记。”
这似乎是一个非常标准的 protobuf 用例。事实上,protobuf 核心发行版为 C++ 和 Java 提供了 CodedInputStream。但是,protobuf 似乎没有为 python 提供这样的工具——“内部”工具不是为这种外部使用设置的:
https://groups.google.com/forum/?fromgroups#!topic/protobuf/xgmUqXVsK-o
所以......在我去拼凑一个python varint解析器和用于解析不同消息类型流的工具之前:有人知道有什么工具吗?
为什么 protobuf 中缺少它? (或者我只是没有找到它?)
这对于 protobuf 来说似乎是一个很大的差距,尤其是与 thrift 的“传输”和“协议”等效工具相比。我看对了吗?
【问题讨论】:
developers.google.com/protocol-buffers/docs/reference/… 呢? 【参考方案1】:看起来另一个答案中的代码可能来自here。在使用此文件之前检查许可证,但我设法让它使用如下代码读取varint32
s:
import sys
import myprotocol_pb2 as proto
import varint # (this is the varint.py file)
data = open("filename.bin", "rb").read() # read file as string
decoder = varint.decodeVarint32 # get a varint32 decoder
# others are available in varint.py
next_pos, pos = 0, 0
while pos < len(data):
msg = proto.Msg() # your message type
next_pos, pos = decoder(data, pos)
msg.ParseFromString(data[pos:pos + next_pos])
# use parsed message
pos += next_pos
print "done!"
这是一个非常简单的代码,旨在加载由varint32
s 分隔的单一类型的消息,描述下一条消息的大小。
更新:也可以使用以下方法直接从 protobuf 库中包含此文件:
from google.protobuf.internal.decoder import _DecodeVarint32
【讨论】:
对于较新的东西,我通过from google.protobuf.internal.decoder import _DecodeVarint32
获得解码器
我最近不得不多次这样做。我无法弄清楚msg.type == proto.Msg.END
的条件,但只需执行while pos < len(data):
对我来说效果很好。
当然,有道理,答案已更新。谢谢@Moodragonx
只是好奇,这看起来是在每个循环中创建一个Msg
类型的新对象,Google 这样做的理由是什么?这开销贵吗?
@Tommy 为了澄清这不是谷歌代码,这是我的代码作为如何在 Python 中解决此问题的示例。我不是 Python 专家,但即使您清除 Message 对象以重用它,它的二进制数据仍然必须从流中复制。如果你想要一个无副本的解决方案,也许 Python 不是你要走的路。我主要在 C++ 中使用 protobufs。【参考方案2】:
我已经实现了一个small python package 来将多个protobuf 消息序列化到一个流中并从一个流中反序列化它们。可以通过pip
安装:
pip install pystream-protobuf
这是一个将两个 protobuf 消息列表写入文件的示例代码:
import stream
with stream.open("test.gam", "wb") as ostream:
ostream.write(*objects_list)
ostream.write(*another_objects_list)
然后从流中读取相同的消息(例如,vg_pb2.py
中定义的对齐消息):
import stream
import vg_pb2
alns_list = []
with stream.open("test.gam", "rb") as istream:
for data in istream:
aln = vg_pb2.Alignment()
aln.ParseFromString(data)
alns_list.append(aln)
【讨论】:
这个包提供了内置的 gzip 压缩。【参考方案3】:这很简单,我可以理解为什么没有人费心制作可重复使用的工具:
'''
Parses multiple protobuf messages from a stream of spinn3r data
'''
import sys
sys.path.append('python_proto/src')
import spinn3rApi_pb2
import protoStream_pb2
data = open('8mny44bs6tYqfnofg0ELPg.protostream').read()
def _VarintDecoder(mask):
'''Like _VarintDecoder() but decodes signed values.'''
local_ord = ord
def DecodeVarint(buffer, pos):
result = 0
shift = 0
while 1:
b = local_ord(buffer[pos])
result |= ((b & 0x7f) << shift)
pos += 1
if not (b & 0x80):
if result > 0x7fffffffffffffff:
result -= (1 << 64)
result |= ~mask
else:
result &= mask
return (result, pos)
shift += 7
if shift >= 64:
## need to create (and also catch) this exception class...
raise _DecodeError('Too many bytes when decoding varint.')
return DecodeVarint
## get a 64bit varint decoder
decoder = _VarintDecoder((1<<64) - 1)
## get the three types of protobuf messages we expect to see
header = protoStream_pb2.ProtoStreamHeader()
delimiter = protoStream_pb2.ProtoStreamDelimiter()
entry = spinn3rApi_pb2.Entry()
## get the header
pos = 0
next_pos, pos = decoder(data, pos)
header.ParseFromString(data[pos:pos + next_pos])
## should check its contents
while 1:
pos += next_pos
next_pos, pos = decoder(data, pos)
delimiter.ParseFromString(data[pos:pos + next_pos])
if delimiter.delimiter_type == delimiter.END:
break
pos += next_pos
next_pos, pos = decoder(data, pos)
entry.ParseFromString(data[pos:pos + next_pos])
print entry
【讨论】:
以上是关于用于从流中读取多个 protobuf 消息的 python 示例的主要内容,如果未能解决你的问题,请参考以下文章