Java 中协议缓冲区分隔的 I/O 函数是不是有 C++ 等效项?
Posted
技术标签:
【中文标题】Java 中协议缓冲区分隔的 I/O 函数是不是有 C++ 等效项?【英文标题】:Are there C++ equivalents for the Protocol Buffers delimited I/O functions in Java?Java 中协议缓冲区分隔的 I/O 函数是否有 C++ 等效项? 【发布时间】:2011-01-21 09:28:51 【问题描述】:我正在尝试从 C++ 和 Java 文件中读取/写入多个协议缓冲区消息。谷歌建议在消息之前写长度前缀,但默认情况下没有办法这样做(我可以看到)。
但是,2.1.0 版中的 Java API 收到了一组“定界”I/O 函数,它们显然可以完成这项工作:
parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo
有 C++ 等价物吗?如果没有,Java API 附加的大小前缀的有线格式是什么,以便我可以在 C++ 中解析这些消息?
更新:
这些现在存在于google/protobuf/util/delimited_message_util.h
v3.3.0 中。
【问题讨论】:
我不知道,但它是开源的,所以你可以从源代码中找到。 是的,这就是我最终所做的。 :) 请参阅下面的答案。 自 v3.3.0 起,google::protobuf::util 为 MessageLite 提供了分隔消息方法。 @KenjiNoguchi 感谢您的提示!我更新了问题以包含它。 【参考方案1】:我在这里聚会有点晚了,但是下面的实现包括其他答案中缺少的一些优化,并且在输入 64MB 后不会失败(尽管它仍然对每条消息强制执行 the 64MB limit,只是没有整个流)。
(我是 C++ 和 Java protobuf 库的作者,但我不再为 Google 工作。很抱歉,这段代码从未进入官方库。如果有的话,这就是它的样子。)
bool writeDelimitedTo(
const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput)
// We create a new coded stream for each message. Don't worry, this is fast.
google::protobuf::io::CodedOutputStream output(rawOutput);
// Write the size.
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL)
// Optimization: The message fits in one buffer, so use the faster
// direct-to-array serialization path.
message.SerializeWithCachedSizesToArray(buffer);
else
// Slightly-slower path when the message is multiple buffers.
message.SerializeWithCachedSizes(&output);
if (output.HadError()) return false;
return true;
bool readDelimitedFrom(
google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message)
// We create a new coded stream for each message. Don't worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
【讨论】:
嘿,感谢肯顿的回答!我会转而接受这个而不是我自己的。虽然我怀疑此时最好的答案是使用 Cap'n Proto 代替? :) 另外 - 为什么不将它合并到 code.google 上的官方 protobuf 库中? 使用 varint 标头的一个缺点是很难指定异步 API(如 ASIO)在读取整个标头时通知您。使用固定大小的整数是微不足道的:您只需要求等到收到 4 个字节(使用 asio::transfer_at_least)。对于 varint,您希望针对一次读取整个标头的常见情况进行优化,同时避免在有人发送具有高位设置的无限字节流时出现二次行为。另外,对我来说,所有这些逻辑对于套接字读取代码来说都感觉有点太高级了。 哦,是的,我应该提一下三个月前我向 protobuf 提交了一个拉取请求以添加这些功能......但尚未被接受:github.com/google/protobuf/pull/710 @fireboot 抱歉,我没有编写 Python 库,所以对它不太熟悉。我必须挖一段时间才能弄清楚,不幸的是我没有时间。 :/ 不过,我可能可以验证其他人生成的代码。【参考方案2】:好的,所以我还没有找到实现我需要的*** C++ 函数,但是通过 Java API 参考进行了一些探索,在 MessageLite 接口内发现了以下内容:
void writeDelimitedTo(OutputStream output)
/* Like writeTo(OutputStream), but writes the size of
the message as a varint before writing the data. */
所以 Java 大小前缀是一个(Protocol Buffers)变量!
有了这些信息,我开始挖掘 C++ API 并找到 CodedStream 标头,其中包含以下内容:
bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)
使用这些,我应该能够推出自己的 C++ 函数来完成这项工作。
不过,他们确实应该将此添加到主消息 API;考虑到 Java 有它,它缺少功能,Marc Gravell 的优秀 protobuf-net C# 端口(通过 SerializeWithLengthPrefix 和 DeserializeWithLengthPrefix)也是如此。
【讨论】:
是的。这就是我解决这个问题的方法。我添加了另一个答案,其中包含一些用于编写消息的示例伪代码。【参考方案3】:我解决了同样的问题,使用 CodedOutputStream/ArrayOutputStream 写入消息(带有大小)和 CodedInputStream/ArrayInputStream 读取消息(带有大小)。
例如,以下伪代码在消息后面写入消息大小:
const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;
google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);
codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);
在编写时,您还应该检查缓冲区是否足够大以容纳消息(包括大小)。并且在阅读时,您应该检查您的缓冲区是否包含完整的消息(包括大小)。
如果他们在 C++ API 中添加类似于 Java API 提供的便利方法,那肯定会很方便。
【讨论】:
我将使用底层OstreamOutputStream
,因此不需要进行长度检查,但感谢您的回答。 :) 在你的情况下,我可能会将 bufLength
设置为 protoMessage.ByteSize()
加上一些额外的大小前缀。【参考方案4】:
IsteamInputStream 对 eofs 和与 std::istream 一起使用时容易发生的其他错误非常脆弱。在此之后,protobuf 流被永久损坏,并且任何已使用的缓冲区数据都被破坏。对 protobuf 中的传统流的读取提供了适当的支持。
实现google::protobuf::io::CopyingInputStream
并将其与CopyingInputStreamAdapter 一起使用。对输出变量执行相同操作。
实际上,解析调用以google::protobuf::io::CopyingInputStream::Read(void* buffer, int size)
结束,其中给出了缓冲区。剩下要做的就是以某种方式读入它。
这是一个使用 Asio 同步流 (SyncReadStream/SyncWriteStream) 的示例:
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
using namespace google::protobuf::io;
template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream
public:
AsioInputStream(SyncReadStream& sock);
int Read(void* buffer, int size);
private:
SyncReadStream& m_Socket;
;
template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
m_Socket(sock)
template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
std::size_t bytes_read;
boost::system::error_code ec;
bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);
if(!ec)
return bytes_read;
else if (ec == boost::asio::error::eof)
return 0;
else
return -1;
template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream
public:
AsioOutputStream(SyncWriteStream& sock);
bool Write(const void* buffer, int size);
private:
SyncWriteStream& m_Socket;
;
template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
m_Socket(sock)
template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
boost::system::error_code ec;
m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
return !ec;
用法:
AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);
Message protoMessage;
uint32_t msg_size;
/* Read message size */
if(!cis.ReadVarint32(&msg_size))
// Handle error
/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis))
// Handle error
/* Remove limit */
cis.PopLimit(msg_limit);
【讨论】:
这是一个巨大的帮助。我曾尝试使用 asio istream/ostream 接口在套接字上执行 protobuf,并将它们包装在 IStreamInputStream/OStreamOutputStream 中,但无法使其正常工作。感谢您发布此信息。有了它和 Kenton 的功能,您可以相当轻松地构建一个客户端/服务器来使用 asio 在 c++ 中与 protobuf 对话。【参考方案5】:给你:
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
using namespace google::protobuf::io;
class FASWriter
std::ofstream mFs;
OstreamOutputStream *_OstreamOutputStream;
CodedOutputStream *_CodedOutputStream;
public:
FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
assert(mFs.good());
_OstreamOutputStream = new OstreamOutputStream(&mFs);
_CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
inline void operator()(const ::google::protobuf::Message &msg)
_CodedOutputStream->WriteVarint32(msg.ByteSize());
if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
std::cout << "SerializeToCodedStream error " << std::endl;
~FASWriter()
delete _CodedOutputStream;
delete _OstreamOutputStream;
mFs.close();
;
class FASReader
std::ifstream mFs;
IstreamInputStream *_IstreamInputStream;
CodedInputStream *_CodedInputStream;
public:
FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
assert(mFs.good());
_IstreamInputStream = new IstreamInputStream(&mFs);
_CodedInputStream = new CodedInputStream(_IstreamInputStream);
template<class T>
bool ReadNext()
T msg;
unsigned __int32 size;
bool ret;
if ( ret = _CodedInputStream->ReadVarint32(&size) )
CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
_CodedInputStream->PopLimit(msgLimit);
std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
return ret;
~FASReader()
delete _CodedInputStream;
delete _IstreamInputStream;
mFs.close();
;
【讨论】:
【参考方案6】:我在 C++ 和 Python 中都遇到了同样的问题。
对于 C++ 版本,我混合使用了 Kenton Varda 在此线程上发布的代码和他发送给 protobuf 团队的拉取请求中的代码(因为这里发布的版本不处理 EOF 而他发送的版本到 github 确实)。
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>
bool writeDelimitedTo(const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput)
// We create a new coded stream for each message. Don't worry, this is fast.
google::protobuf::io::CodedOutputStream output(rawOutput);
// Write the size.
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL)
// Optimization: The message fits in one buffer, so use the faster
// direct-to-array serialization path.
message.SerializeWithCachedSizesToArray(buffer);
else
// Slightly-slower path when the message is multiple buffers.
message.SerializeWithCachedSizes(&output);
if (output.HadError())
return false;
return true;
bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
// We create a new coded stream for each message. Don't worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
const int start = input.CurrentPosition();
if (clean_eof)
*clean_eof = false;
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size))
if (clean_eof)
*clean_eof = input.CurrentPosition() == start;
return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
这是我的 python2 实现:
from google.protobuf.internal import encoder
from google.protobuf.internal import decoder
#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
mask = 0x80 # (1 << 7)
raw_varint32 = []
while 1:
b = stream.read(1)
#eof
if b == "":
break
raw_varint32.append(b)
if not (ord(b) & mask):
#we found a byte starting with a 0, which means it's the last byte of this varint
break
return raw_varint32
def writeDelimitedTo(message, stream):
message_str = message.SerializeToString()
delimiter = encoder._VarintBytes(len(message_str))
stream.write(delimiter + message_str)
def readDelimitedFrom(MessageType, stream):
raw_varint32 = readRawVarint32(stream)
message = None
if raw_varint32:
size, _ = decoder._DecodeVarint32(raw_varint32, 0)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message = MessageType()
message.ParseFromString(data)
return message
#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
raw_varint32 = readRawVarint32(stream)
if raw_varint32:
size, _ = decoder._DecodeVarint32(raw_varint32, 0)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message.ParseFromString(data)
return message
else:
return None
它可能不是最好看的代码,我相信它可以重构一些,但至少应该向您展示一种方法。
现在最大的问题是:SLOW。
即使使用 python-protobuf 的 C++ 实现,它也比纯 C++ 慢一个数量级。我有一个基准测试,我从一个文件中读取了 10M 条约 30 字节的 protobuf 消息。在 C++ 中大约需要 0.9 秒,在 python 中需要 35 秒。
使其速度更快的一种方法是重新实现 varint 解码器,使其从文件中读取并一次性解码,而不是像当前代码那样从文件中读取然后解码。 (分析显示大量时间花费在 varint 编码器/解码器中)。但不用说,仅凭这一点还不足以缩小python版本和C++版本的差距。
非常欢迎任何让它更快的想法:)
【讨论】:
有一个普遍的问题,为什么在 Java/Python/C++ 中有不同的编码/解码实现。我不明白为什么 C++ 中没有基本实现,它只是在 Java/Python 中调用... 您的 python 代码在使用 Python3 时似乎不起作用。您需要读取字节而不是字符串,decoder
才能工作。
是的,这段代码是为 python 2 编写的,但它应该很容易适应它并使其适用于 python 3。我已经编辑了我的帖子以表明这段代码针对 python 2。
你能确认流在python中是StringIO类型吗【参考方案7】:
为了完整起见,我在这里发布了一个最新版本,它可以与 protobuf 和 Python3 的主版本一起使用
对于 C++ 版本,使用 delimited_message_utils.h 中的 utils 就足够了,这里是 MWE
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/util/delimited_message_util.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
template <typename T>
bool writeManyToFile(std::deque<T> messages, std::string filename)
int outfd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC);
google::protobuf::io::FileOutputStream fout(outfd);
bool success;
for (auto msg: messages)
success = google::protobuf::util::SerializeDelimitedToZeroCopyStream(
msg, &fout);
if (! success)
std::cout << "Writing Failed" << std::endl;
break;
fout.Close();
close(outfd);
return success;
template <typename T>
std::deque<T> readManyFromFile(std::string filename)
int infd = open(filename.c_str(), O_RDONLY);
google::protobuf::io::FileInputStream fin(infd);
bool keep = true;
bool clean_eof = true;
std::deque<T> out;
while (keep)
T msg;
keep = google::protobuf::util::ParseDelimitedFromZeroCopyStream(
&msg, &fin, nullptr);
if (keep)
out.push_back(msg);
fin.Close();
close(infd);
return out;
对于 Python3 版本,基于 @fireboot 的回答,唯一需要修改的是 raw_varint32 的解码
def getSize(raw_varint32):
result = 0
shift = 0
b = six.indexbytes(raw_varint32, 0)
result |= ((ord(b) & 0x7f) << shift)
return result
def readDelimitedFrom(MessageType, stream):
raw_varint32 = readRawVarint32(stream)
message = None
if raw_varint32:
size = getSize(raw_varint32)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message = MessageType()
message.ParseFromString(data)
return message
【讨论】:
这个解决方案应该得到更高的评价。较旧的答案并未反映 protobuf C++ 库如何演变以解决明显的缺陷。【参考方案8】:也在为此寻找解决方案。这是我们解决方案的核心,假设一些 java 代码将许多带有writeDelimitedTo
的 MyRecord 消息写入文件。打开文件并循环,执行:
希望对你有帮助。
【讨论】:
【参考方案9】:使用objective-c 版本的protocol-buffers,我遇到了这个确切的问题。在从 iOS 客户端发送到使用 parseDelimitedFrom 的基于 Java 的服务器时,它期望长度作为第一个字节,我需要首先将 writeRawByte 调用到 CodedOutputStream。在这里发帖希望能帮助遇到这个问题的其他人。在解决这个问题时,人们会认为 Google proto-bufs 会带有一个简单的标志,可以为您执行此操作...
Request* request = [rBuild build];
[self sendMessage:request];
- (void) sendMessage:(Request *) request
//** get length
NSData* n = [request data];
uint8_t len = [n length];
PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
//** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
[os writeRawByte:len];
[request writeToCodedOutputStream:os];
[os flush];
【讨论】:
【参考方案10】:由于我不允许将其写为对肯顿瓦尔达上述回答的评论;我相信他发布的代码中存在错误(以及已提供的其他答案)。以下代码:
...
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
...
设置了不正确的限制,因为它没有考虑已经从输入中读取的 varint32 的大小。这可能会导致数据丢失/损坏,因为从流中读取的附加字节可能是下一条消息的一部分。正确处理此问题的通常方法是删除用于读取大小的 CodedInputStream 并创建一个新的用于读取有效负载:
...
uint32_t size;
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
if (!input.ReadVarint32(&size)) return false;
google::protobuf::io::CodedInputStream input(rawInput);
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
...
【讨论】:
只有在大小前缀包含它自己的大小时才会这样,但事实并非如此。如果你这样做,你最终不会阅读整个消息。 正是因为大小前缀不包含自己的大小,才会出现这个问题。 大小前缀正好包含消息的大小,跟随它。然后代码继续读取包含整个消息的那么多字节。问题出在哪里? 原始代码和我发布的版本都可以正常工作,事实证明这毕竟不是我的问题。我的问题是 CodedInputStream 意外消耗源缓冲区中的所有数据,即使已设置限制。我试图确定剩余的数据量,而 CodedInputStream 使这变得非常困难。在 C# 中,这个问题帮助我弄清楚了:***.com/questions/33733913/…【参考方案11】:您可以使用 getline 从流中读取字符串,使用指定的分隔符:
istream& getline ( istream& is, string& str, char delim );
(在标题中定义)
【讨论】:
不是一回事;协议缓冲区是二进制格式,“分隔”函数实际上只是在前面加上一个大小。我需要知道大小前缀的格式。以上是关于Java 中协议缓冲区分隔的 I/O 函数是不是有 C++ 等效项?的主要内容,如果未能解决你的问题,请参考以下文章
JAVA-初步认识-I/O流(字符流-缓冲区-复制文本文件)