从固定大小的字节缓冲区的连续块中解析 protobuf 消息序列

Posted

技术标签:

【中文标题】从固定大小的字节缓冲区的连续块中解析 protobuf 消息序列【英文标题】:Parse sequences of protobuf messages from continguous chunks of fixed sized byte buffer 【发布时间】:2015-03-20 02:38:43 【问题描述】:

由于我对 C++ 的了解不足,我已经为此苦苦挣扎了两天。我需要做的是使用 protobuf C++ API 从一个大文件中解析消息序列,这个文件可能包含数百万条这样的消息。直接从文件中读取很容易,因为我总是可以执行“ReadVarInt32”来获取大小,然后执行 ParseFromCodedStream 并将限制推送到 CodedInputStream,如this post 中所述。但是,我正在使用的 I/O 级别 API(实际上是 libuv)需要为每个读取回调操作分配固定大小的缓冲区。显然,块大小与我正在读取的消息大小无关。

这让我的生活变得艰难。基本上每次我从文件中读取并填充固定大小的缓冲区(比如 16K)时,该缓冲区可能包含数百条完整的 protobuf 消息,但该缓冲区的最后一块可能是不完整的消息。所以我想,好吧,我应该做的是尝试尽可能多地读取消息,最后,提取最后一个块并将其附加到我读出的下一个 16K 缓冲区的开头,继续直到我达到 EOF文件。我使用 ReadVarInt32() 来获取大小,然后将该数字与缓冲区大小的其余部分进行比较,如果消息大小较小,则继续读取。

有一个叫做GetDirectBufferPointer的API,所以我尝试用它来记录指针位置之前我什至读出了下一条消息的大小。但是我怀疑由于字节顺序的怪异,如果我只是从指针开始的地方提取字节数组的其余部分并附加到下一个块,Parse 不会成功,实际上前几个字节(我认为是 8 个)完全搞砸了.

或者,如果我执行 codedStream.ReadRaw() 并将剩余流写入缓冲区,然后附加到新块的头部,则数据不会被破坏。但问题是这次我将丢失“大小”字节信息,因为它已经在“ReadVarInt32”中“读取”了!而且即使我只是继续记住我上次读取的大小信息并直接调用下一次迭代message.ParseFromCodedStream(),它最终还是少读了一个字节,有些部分甚至损坏了,无法成功恢复对象。

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
char bResidueBuffer[READ_BUFFER_SIZE];
char temp[READ_BUFFER_SIZE];
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) 
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    mCheckBuffer.clear();
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. Excuse my terrible C++ foo
    std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
    mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //Record the pointer location on CIS in bResidueBuffer
    cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
    &bResidueBufSize);

    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) 
         cis.ReadVarint32(&size);
    
    //Have to read this again to get remaining buffer size
    cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize);

    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer, or, it's the end of message 
    //and my buffer just allocated larger so size should be 0
    while (size <= mResidueBufSize && size != 0) 
        //If this cis I constructed didn't have the size info at the beginning, 
        //and I just read straight from it hoping to get the message out from 
        //the "size" I got from last iteration, it simply doesn't work
        //(read one less byte in fact, and some part of the message corrupted)
        //push the size constraint to the input stream;
        int limit = cis.PushLimit(size);
        //parse message from the input stream
        message.ParseFromCodedStream(&cis);  
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        printf("%s", str.c_str());
        //do something with the parsed object
        //Now I have to record the new pointer location again
        cis.GetDirectBufferPointer((const void**)&bResidueBuffer, 
        &bResidueBufSize);
        //Read another time the next message's size and go back to while loop check
        cis.ReadVarint32(&size);

    
    //If I do the next line, bResidueBuffer will have the correct CIS information 
    //copied over, but not having the "already read" size info
    cis.ReadRaw(bResidueBuffer, bResidueBufSize);
    mResidueBuffer.clear();
    //I am constructing a new vector that receives the residual chunk of the 
    //current buffer that isn't enough to restore a message
    //If I don't do ReadRaw, this copy completely messes up at least the first 8 
    //bytes of the copied buffer's value, due to I suspect endianness
    mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0], 
    &bResidueBuffer[bResidueBufSize]);

我现在真的不知道了。是否可以优雅地使用 protobuf 和需要固定大小的中间缓冲区的 API?非常感谢任何输入,谢谢!

【问题讨论】:

【参考方案1】:

我发现您的代码存在两个主要问题:

std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

看起来您希望std::merge 连接您的缓冲区,但实际上这个函数将两个排序数组合并为一个合并排序意义上的排序数组。在这种情况下,这没有任何意义。 mCheckBuffer 最终会包含废话。

cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);

在这里,您将&amp;bResidueBuffer 转换为不兼容的指针类型。 bResidueBuffer 是一个 char 数组,所以 &amp;bResidueBuffer 是一个指向 char 数组的指针,它不是一个指向指针的指针。这确实令人困惑,因为数组可以隐式转换为指针(指针指向数组的第一个元素),但这实际上是一种转换——bResidueBuffer 本身 不是 指针,它可以只转换为一个。

我认为您也误解了 GetDirectBufferPointer() 的作用。看起来您希望它将缓冲区的其余部分复制到bResidueBuffer,但该方法从不复制任何数据。该方法返回一个指向原始缓冲区的指针。

正确的称呼是这样的:

const void* ptr;
int size;
cis.GetDirectBufferPointer(&ptr, &size);

现在ptr 将指向原始缓冲区。您现在可以将其与指向缓冲区开头的指针进行比较,以找出您在流中的位置,例如:

size_t pos = (const char*)ptr - &mCheckBuffer[0];

但是,您不应该这样做,因为 CodedInputStream 已经有 CurrentPosition() 方法用于此目的。这将返回缓冲区中的当前字节偏移量。因此,请改用它。

【讨论】:

谢谢!我会更仔细地阅读你的其他 cmets ...... std:merge 绝对是一个很好的收获!我知道 CurrentPosition() 方法,但是我需要复制数据,所以我需要取出指针。而且,我没有尝试使用 GetDirectBufferPointer() 来复制数据,我试图做的只是获取指针和大小信息,然后实际进行复制 --- 我调用“ReadRaw”方法然后“插入” ”。有了这个,我遇到了最大的问题,因为“大小”字节信息已经消失并且此时没有被复制。 cis.CurrentPosition() 返回mCheckBuffer 内的位置。例如。你可以通过说&amp;mCheckBuffer[cis.CurrentPosition()]来获得一个指针。所以你不需要使用GetDirectBuferPointer() 等等,我完全忘记了整个 CIS 仍然由 mCheckBuffer 支持!通过这种方式,我应该能够通过这种方式获得任意索引和复制缓冲区!让我对其余部分进行更多研究...也许有办法保留已读取的大小信息...非常感谢! @Superziyi 也不要忘记你的缓冲区可能在大小本身的中间结束,所以你真的需要在读取大小时检查返回值。 (ReadVarint32() 如果无法读取完整的变量,则返回 false。) 大概in.read()返回实际读取的字节数。然后,您应该将读取缓冲区视为只有那么长,这样您就不会在结束后读取垃圾。【参考方案2】:

好的,感谢 Kenton 帮助指出我的问题中的主要问题,我现在已经修改了代码片段并对其进行了测试。我将在这里发布我的解决方案。然而,话虽如此,我对我需要在这里做的所有复杂性和边缘情况检查感到不高兴。我认为这很容易出错。即使这样,我可能真正要做的是在我的 libuv 主线程之外的另一个线程中编写我的直接“从流中读取”阻塞调用,这样我就不需要使用 libuv API。但为了完整起见,这是我的代码:

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
std::vector<char> mReadBuffer(READ_BUFFER_SIZE);
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) 
    //This part is tricky as you're not guaranteed that what end up in 
    //mReadBuffer is everything you read out from the file. The same 
    //happens with libuv's assigned buffer, after EOF, what's rest in 
    //the buffer could be anything
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. I couldn't find a more 
    //efficient way doing that
    mCheckBuffer.clear();
    mCheckBuffer.reserve(mResidueBuffer.size() + mReadBuffer.size());
    mCheckBuffer.insert(mCheckBuffer.end(), mResidueBuffer.begin(),
    mResidueBuffer.end());
    mCheckBuffer.insert(mCheckBuffer.end(), mReadBuffer.begin(),
    mReadBuffer.end());
    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) 
        cis.ReadVarint32(&size);
    
    bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer. If, it's the end of message 
    //and size (next byte I read from stream) happens to be 0, that
    //will trip me up, cos when I push size 0 into PushLimit and then try 
    //parsing, it will actually return true even if it reads nothing. 
    //So I can get into an infinite loop, if I don't do the check here
    while (size <= bResidueBufSize && size != 0) 
        //If this cis I constructed didn't have the size info at the 
        //beginning, and I just read straight from it hoping to get the  
        //message out from the "size" I got from last iteration
        //push the size constraint to the input stream
        int limit = cis.PushLimit(size); 
        //parse the message from the input stream
        bool result = message.ParseFromCodedStream(&cis);  
        //Parse fail, it could be because last iteration already took care
        //of the last message and that size I read last time is just junk
        //I choose to only check EOF here when result is not true, (which
        //leads me to having to check for size=0 case above), cos it will
        //be too many checks if I check it everytime I finish reading a 
        //message out
        if(!result) 
            if(in.eof()) 
                log.info("Reached EOF, stop processing!");
                break;
            
            else 
                log.error("Read error or input mal-formatted! Log error!");
                exit;
            
        
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        //Do something with the message

        //This is when the last message read out exactly reach the end of 
        //the buffer and there is no size information available on the 
        //stream any more, in which case size will need to be reset to zero
        //so that the beginning of next iteration will read size info first
        if(!cis.ReadVarint32(&size)) 
            size = 0;
        
        bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
    
    if(in.eof()) 
        break;
    
    //Now I am copying the residual buffer into the intermediate
    //mResidueBuffer, which will be merged with newly read data in next iteration
    mResidueBuffer.clear();
    mResidueBuffer.reserve(bResidueBufSize);
    mResidueBuffer.insert(mResidueBuffer.end(), 
    &mCheckBuffer[cis.CurrentPosition()],&mCheckBuffer[mCheckBuffer.size()]);

if(!in.eof()) 
    log.error("Something else other than EOF happened to the file, log error!");
    exit;

【讨论】:

以上是关于从固定大小的字节缓冲区的连续块中解析 protobuf 消息序列的主要内容,如果未能解决你的问题,请参考以下文章

C# 错误:不能使用包含在未固定表达式中的固定大小缓冲区

protobuf 标签DataFormat =DataFormat.FixedSize解决连续int字段无法解析

C# 将字节解析为结构顺序

为啥固定大小的缓冲区(数组)一定是不安全的?

将固定字节从输入流存储到字节数组中

Java 音频字节缓冲区需要不同的时间来填充