在 Java 中读取 protobuf 消息时出现异常
Posted
技术标签:
【中文标题】在 Java 中读取 protobuf 消息时出现异常【英文标题】:Exceptions when reading protobuf messages in Java 【发布时间】:2011-07-02 15:19:32 【问题描述】:我已经使用 protobuf 几个星期了,但在 Java 中解析 protobuf 消息时我仍然不断收到异常。
我使用 C++ 创建我的 protobuf 消息并使用 boost sockets 将它们发送到 Java 客户端正在侦听的服务器套接字。用于传输消息的 C++ 代码是这样的:
boost::asio::streambuf b;
std::ostream os(&b);
ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);
coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);
delete coded_output;
delete raw_output;
boost::system::error_code ignored_error;
boost::asio::async_write(socket, b.data(), boost::bind(
&MessageService::handle_write, this,
boost::asio::placeholders::error));
如你所见,我用WriteVarint32
写了消息的长度,因此Java 端应该知道使用parseDelimitedFrom
应该读多远:
AgentMessage agentMessage = AgentMessageProtos.AgentMessage
.parseDelimitedFrom(socket.getInputStream());
但这无济于事,我不断收到这些异常:
Protocol message contained an invalid tag (zero).
Message missing required fields: ...
Protocol message tag had invalid wire type.
Protocol message end-group tag did not match expected tag.
While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
重要要知道,这些异常不会在每条消息上都抛出。这只是我收到最多的消息的一小部分,效果很好 - 我仍然想解决这个问题,因为我不想忽略这些消息。
如果有人可以帮助我或将他的想法付诸实践,我将非常感激。
另一个有趣的事实是我收到的消息数量。我的程序通常在 2 秒内总共有 1.000 条消息。在 20 秒内大约 100.000 等等。我人为地减少了发送的消息,当只发送 6-8 条消息时,完全没有错误。那么这可能是Java客户端套接字端的缓冲问题吗?
假设有 60.000 条消息,其中平均有 5 条已损坏。
【问题讨论】:
这可能是一个愚蠢的问题,但有没有什么办法让您在数据中留下填充/超大缓冲区,而不是修剪任何盈余? (这个错误肯定很容易由备用零引起) @Marc-Gravell:什么是超大缓冲区?实际上,我不明白您认为可能导致这种情况的原因。也许你可以指出我应该在哪里寻找这个?顺便提一句。我还添加了一些我收到的其他例外情况。 我见过的其他人经常这样做 - 使用编码尝试将 BLOB 获取为字符串,然后解码.保证损坏(如果你想要一个字符串,而不是 UTF/codepage/etc,应该使用 base-64 或 hex 等) "Java 客户端套接字端" - 好吧,我更容易期待流处理代码中的一个错误,尤其是在异步/线程的情况下。可以是读也可以是写,在这种情况下......降低速率显然会消除许多由不正确的异步引起的意外冲突。 【参考方案1】:[我不是真正的 TCP 专家,这可能有点离题]
问题是,[Java] TCP Socket 的read(byte[] buffer)
会在读取到 TCP 帧结束后返回。如果这恰好是消息中间(我的意思是,protobuf 消息),解析器将阻塞并抛出 InvalidProtocolBufferException
。
任何 protobuf 解析调用在内部使用 CodedInputStream
(src here),如果源是 InputStream
,则依赖于 read()
——因此,受 TCP 套接字问题的影响。
因此,当您通过套接字填充大量数据时,一些消息势必会被分成两帧 - 这就是它们被破坏的地方。
我猜,当您降低消息传输率(如您所说的每秒 6-8 条消息)时,每帧都会在下一个数据块放入流之前发送,因此每条消息总是有自己的TCP 帧,即没有被拆分并且不会出错。 (或者也许只是错误很少见,而且发生率低只是意味着您需要更多时间才能看到它们)
至于解决方案,您最好自己处理缓冲区,即从套接字读取byte[]
(可能使用readFully()
而不是read()
,因为前者会阻塞,直到有足够的数据填充缓冲区[或遇到 EOF],因此它有点抵抗中间消息帧结束的事情),确保它有足够的数据来解析成整个消息,然后将缓冲区提供给解析器。
此外,this Google Groups topic 中也有一些关于该主题的好读物——这就是我得到 readFully()
部分的地方。
【讨论】:
【参考方案2】:我不熟悉 Java API,但我想知道 Java 如何处理表示消息长度的 uint32 值,因为 Java 只有带符号的 32 位整数。快速查看 Java API 参考告诉我,一个无符号的 32 位值存储在一个有符号的 32 位变量中。那么如何处理无符号 32 位值表示消息长度的情况呢?此外,Java 实现中似乎支持 varint 有符号整数。它们被称为 ZigZag32/64。 AFAIK,C++ 版本不知道这种编码。那么,您的问题的原因可能与这些事情有关?
【讨论】:
也许吧,但我想这种情况每次都会发生,因为这些异常只是有时才会触发,我不确定。以上是关于在 Java 中读取 protobuf 消息时出现异常的主要内容,如果未能解决你的问题,请参考以下文章
使用管理 API 将 Protobuf 消息发布到 Pulsar 模式注册表时出现 500 错误
Protobuf-net 使用嵌套数据反序列化时出现无效的线型异常(C++ 到 C#)
Jmeter protobuf 测试。无法读取 Protobuf 消息