如何“连接”或“组合”或“加入”一系列“二进制”序列化字节数组? [复制]

Posted

技术标签:

【中文标题】如何“连接”或“组合”或“加入”一系列“二进制”序列化字节数组? [复制]【英文标题】:How to "concatenate" or "combine" or "join" a series of 'binarily' serialized byte arrays? [duplicate] 【发布时间】:2012-11-25 15:40:33 【问题描述】:

可能重复:Is the received stream from a socket limited to a single send command?

注意:我认为这个问题非常复杂(希望不是针对你们,这就是我在这里问的原因,哈哈),我尽力解释得尽可能简单明了。

在我的应用程序中,我不断在固定大小的缓冲区中接收字节数组。

我收到的这些字节数组系列已被“二进制”序列化。

但是,有时接收到的字节数组会大于固定大小的缓冲区,因此我需要将当前接收到的字节数组存储到容器中并再次循环以接收剩余的字节数组。

我现在的问题是如何“连接”或“组合”或“加入”我收到的所有“批次”字节数组(并存储在容器中,可能是字节数组队列)以形成单个字节数组然后反序列化它们?

int bytesRead = client.EndReceive(ar);
if (bytesRead > 0)
    
        // There might be more data, so store the data received so far.
        // If the buffer was not filled, I have to get the number of bytes received as Thorsten Dittmar was saying, before queuing it
        dataReceivedQueue.Enqueue(state.buffer);

        // Get the rest of the data.
        client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
        new AsyncCallback(ReceiveCallback_onQuery), state);
    
else

    // All the data has arrived; put it in response.
    response_onQueryHistory = ByteArrayToObject(functionThatCombinesBytes(dataReceivedQueue));

    // Signal that all bytes have been received.
    receiveDoneQuery.Set();

state.buffer 是接收数据的缓冲区。 buffer 是一个大小为 4096 的字节数组。 state 是 StateObject 类型。

ByteArrayToObject(byte []) 负责反序列化接收到的数据并将其转换回其对象形式

functionThatCombinesBytes(Queue) 这个函数将接收一个字节队列并将所有字节“组合”成一个字节数组

【问题讨论】:

这个问题几乎每天都会出现。 “问题”是 TCP 是流式传输的,很多人希望套接字能够交换消息。他们没有,他们交换任意大小的字节数组。您需要一个显示“这里有 N 个字节” 或某种分隔符的协议(“这是消息,KTNXBYE”)。例如,请参阅:Is the received stream from a socket limited to a single send command?。对于简单的附加字节数组,使用Buffer.BlockCopy() 【参考方案1】:

仅仅因为您使用特定大小的缓冲区调用BeginReceive,并不意味着它一定会完全填满缓冲区,因此很可能您的一些排队缓冲区实际上只会部分填充收到数据,其余为零,如果您只是将它们连接在一起,这几乎肯定会破坏您的组合流,因为您没有同时存储实际读入缓冲区的字节数。您似乎每次都在重用相同的缓冲区,因此您只会用新数据覆盖已读取的数据。

因此,我建议将您的 dataReceivedQueue 替换为 MemoryStream,并使用类似:

if (bytesRead > 0)
    
        // There might be more data, so store the data received so far.
        memoryStream.Write(state.buffer, 0, bytesRead);

        // Get the rest of the data.
        client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
        new AsyncCallback(ReceiveCallback_onQuery), state);
    
else

    // All the data has arrived; put it in response.
    response_onQueryHistory = ByteArrayToObject(memoryStream.ToArray());

    // Signal that all bytes have been received.
    receiveDoneQuery.Set();

【讨论】:

你还是需要考虑到队列中的字节数组可能没有被填满。 @ThorstenDittmar 我最初只是简单地回答了被问到的问题(如何将一组字节数组连接成一个数组),但很明显还有许多其他问题,所以完全重写了答案。 感谢 Iridium 和 Thorsten Dittmar 指出可能的部分填充字节数组 工作就像@Iridium 的魅力,谢谢! @iridium 内存流,如果在接收回调例程中声明和使用,是否不会在每次调用时重写【参考方案2】:

首先,除非您的dataReceivedQueue 的类型实现了它自己的(或覆盖Queue 的)Enqueue 方法,否则您的state.buffer 将被每个client.BeginReceive 调用重写。

您可以简单地将 MemoryStream 成员添加到您的 StateObject 并在它们到来时附加字节:

state.rawData.Seek(0, SeekOrigin.End);
state.rawData.Write(state.buffer, 0, bytesRead);

【讨论】:

是的,每个客户端都会重写状态缓冲区。BeginReceive 调用以重新使用分配的内存。在接收到内存流中接收到的所有字节后,我只需要将内存流内容转换为字节数组? 是的,你可以ToArray()它,Read它的内容到一个现有的数组,CopyTo另一个MemoryStream。或者你可以从 MemoryStream 实现一个对象反序列化器。【参考方案3】:

首先,你不仅需要存储字节数组,还需要存储数组中实际有效的字节数。例如,每次接收可能不会完全填满缓冲区,因此会返回字节数(代码中的bytesRead)。

如果你有这个,你可以通过总结每个“批次”接收到的字节数来计算最终缓冲区的大小。

之后,您可以 - 在循环中 - 使用 Array.Copy 将“批次”以指定长度复制到目标数组的指定位置。

例如,这可能如下所示:

// Batch is a class that contains the batch byte buffer and the number of bytes valid
int destinationPos = 0;
byte[] destination = new byte[<number of bytes in total>];
foreach (Batch b in batches)

    Array.Copy(b.Bytes, 0, destination, destinationPos, b.ValidLength);

【讨论】:

以上是关于如何“连接”或“组合”或“加入”一系列“二进制”序列化字节数组? [复制]的主要内容,如果未能解决你的问题,请参考以下文章

Stripe:如何在一系列价格中保留付款

在谷歌大查询中获得完全加入,在大查询中保持所有频率组合,让我只为所有类型的加入提供左加入

SQL:联合或自加入

组合连接表后,然后在其他表中保存/追加

在 dplyr 中,如何通过可能存在或不存在的列连接数据框?

如何合并多维数据框和不同长度的系列?