在 Java 中读取 protobuf 消息时出现异常

Posted

技术标签:

【中文标题】在 Java 中读取 protobuf 消息时出现异常【英文标题】:Exceptions when reading protobuf messages in Java 【发布时间】:2011-07-02 15:19:32 【问题描述】:

我已经使用 protobuf 几个星期了,但在 Java 中解析 protobuf 消息时我仍然不断收到异常。

我使用 C++ 创建我的 protobuf 消息并使用 boost sockets 将它们发送到 Java 客户端正在侦听的服务器套接字。用于传输消息的 C++ 代码是这样的:

boost::asio::streambuf b;
std::ostream os(&b);

ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);

delete coded_output;
delete raw_output;

boost::system::error_code ignored_error;

boost::asio::async_write(socket, b.data(), boost::bind(
        &MessageService::handle_write, this,
        boost::asio::placeholders::error));

如你所见,我用WriteVarint32 写了消息的长度,因此Java 端应该知道使用parseDelimitedFrom 应该读多远:

AgentMessage agentMessage = AgentMessageProtos.AgentMessage    
                                .parseDelimitedFrom(socket.getInputStream());

但这无济于事,我不断收到这些异常:

Protocol message contained an invalid tag (zero).
Message missing required fields: ...
Protocol message tag had invalid wire type.
Protocol message end-group tag did not match expected tag.
While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either than the input has been truncated or that an embedded message misreported its own length.

重要要知道,这些异常不会在每条消息上都抛出。这只是我收到最多的消息的一小部分,效果很好 - 我仍然想解决这个问题,因为我不想忽略这些消息。

如果有人可以帮助我或将他的想法付诸实践,我将非常感激。


另一个有趣的事实是我收到的消息数量。我的程序通常在 2 秒内总共有 1.000 条消息。在 20 秒内大约 100.000 等等。我人为地减少了发送的消息,当只发送 6-8 条消息时,完全没有错误。那么这可能是Java客户端套接字端的缓冲问题吗?

假设有 60.000 条消息,其中平均有 5 条已损坏。

【问题讨论】:

这可能是一个愚蠢的问题,但有没有什么办法让您在数据中留下填充/超大缓冲区,而不是修剪任何盈余? (这个错误肯定很容易由备用零引起) @Marc-Gravell:什么是超大缓冲区?实际上,我不明白您认为可能导致这种情况的原因。也许你可以指出我应该在哪里寻找这个?顺便提一句。我还添加了一些我收到的其他例外情况。 我见过的其他人经常这样做 - 使用编码尝试将 BLOB 获取为字符串,然后解码.保证损坏(如果你想要一个字符串,而不是 UTF/codepage/etc,应该使用 base-64 或 hex 等) "Java 客户端套接字端" - 好吧,我更容易期待流处理代码中的一个错误,尤其是在异步/线程的情况下。可以是读也可以是写,在这种情况下......降低速率显然会消除许多由不正确的异步引起的意外冲突。 【参考方案1】:

[我不是真正的 TCP 专家,这可能有点离题]

问题是,[Java] TCP Socket 的read(byte[] buffer) 会在读取到 TCP 帧结束后返回。如果这恰好是消息中间(我的意思是,protobuf 消息),解析器将阻塞并抛出 InvalidProtocolBufferException

任何 protobuf 解析调用在内部使用 CodedInputStream (src here),如果源是 InputStream,则依赖于 read()——因此,受 TCP 套接字问题的影响。

因此,当您通过套接字填充大量数据时,一些消息势必会被分成两帧 - 这就是它们被破坏的地方。

我猜,当您降低消息传输率(如您所说的每秒 6-8 条消息)时,每帧都会在下一个数据块放入流之前发送,因此每条消息总是有自己的TCP 帧,即没有被拆分并且不会出错。 (或者也许只是错误很少见,而且发生率低只是意味着您需要更多时间才能看到它们)

至于解决方案,您最好自己处理缓冲区,即从套接字读取byte[](可能使用readFully() 而不是read(),因为前者会阻塞,直到有足够的数据填充缓冲区[或遇到 EOF],因此它有点抵抗中间消息帧结束的事情),确保它有足够的数据来解析成整个消息,然后将缓冲区提供给解析器。

此外,this Google Groups topic 中也有一些关于该主题的好读物——这就是我得到 readFully() 部分的地方。

【讨论】:

【参考方案2】:

我不熟悉 Java API,但我想知道 Java 如何处理表示消息长度的 uint32 值,因为 Java 只有带符号的 32 位整数。快速查看 Java API 参考告诉我,一个无符号的 32 位值存储在一个有符号的 32 位变量中。那么如何处理无符号 32 位值表示消息长度的情况呢?此外,Java 实现中似乎支持 varint 有符号整数。它们被称为 ZigZag32/64。 AFAIK,C++ 版本不知道这种编码。那么,您的问题的原因可能与这些事情有关?

【讨论】:

也许吧,但我想这种情况每次都会发生,因为这些异常只是有时才会触发,我不确定。

以上是关于在 Java 中读取 protobuf 消息时出现异常的主要内容,如果未能解决你的问题,请参考以下文章

使用管理 API 将 Protobuf 消息发布到 Pulsar 模式注册表时出现 500 错误

Protobuf-net 使用嵌套数据反序列化时出现无效的线型异常(C++ 到 C#)

Jmeter protobuf 测试。无法读取 Protobuf 消息

Angular 10 + PubNub - 从特定频道读取消息时出现问题

使用 protobuf.net 序列化图形时出现问题

在便携式库上安装 protobuf-net 时出现 nuget 错误