C# Begin/EndReceive - 我如何读取大数据?

Posted

技术标签:

【中文标题】C# Begin/EndReceive - 我如何读取大数据?【英文标题】:C# Begin/EndReceive - how do I read large data? 【发布时间】:2010-10-09 14:43:33 【问题描述】:

当以 1024 块的形式读取数据时,如何继续从接收到大于 1024 字节的消息的套接字读取数据,直到没有数据为止?我是否应该只使用 BeginReceive 来读取数据包的长度前缀,然后一旦检索到,使用 Receive() (在异步线程中)来读取数据包的其余部分?还是有别的办法?

编辑:

我认为 Jon Skeet 的链接有解决方案,但该代码有一点减速带。我使用的代码是:

public class StateObject

    public Socket workSocket = null;
    public const int BUFFER_SIZE = 1024;
    public byte[] buffer = new byte[BUFFER_SIZE];
    public StringBuilder sb = new StringBuilder();


public static void Read_Callback(IAsyncResult ar)

    StateObject so = (StateObject) ar.AsyncState;
    Socket s = so.workSocket;
    
    int read = s.EndReceive(ar);
    
    if (read > 0) 
    
        so.sb.Append(Encoding.ASCII.GetString(so.buffer, 0, read));

        if (read == StateObject.BUFFER_SIZE)
        
            s.BeginReceive(so.buffer, 0, StateObject.BUFFER_SIZE, 0, 
                    new AyncCallback(Async_Send_Receive.Read_Callback), so);
            return;
        
    
    
    if (so.sb.Length > 0)
    
        //All of the data has been read, so displays it to the console
        string strContent;
        strContent = so.sb.ToString();
        Console.WriteLine(String.Format("Read 0 byte from socket" + 
        "data = 1 ", strContent.Length, strContent));
    
    s.Close();

现在这个修正在大多数情况下都可以正常工作,但是当数据包的大小是缓冲区的倍数时它会失败。这样做的原因是,如果缓冲区在读取时被填满,则假定有更多数据;但同样的问题发生在以前。例如,一个 2 字节缓冲区在 4 字节数据包上被填充两次,并假设有更多数据。然后它会阻塞,因为没有什么可读取的了。 问题是接收函数不知道数据包何时结束。


这让我想到了两种可能的解决方案:我可以有一个数据包结束分隔符,或者我可以读取数据包标头以找到长度,然后准确接收该长度(正如我最初建议的那样)。

不过,这些都存在问题。我不喜欢使用分隔符的想法,因为用户可以以某种方式将其放入应用程序输入字符串中的数据包中并将其搞砸。这对我来说似乎也有点草率。

长度标头听起来不错,但我正计划使用协议缓冲区 - 我不知道数据的格式。有长度标题吗?它是多少字节?这会是我自己实现的吗?等等。

我该怎么办?

【问题讨论】:

【参考方案1】:

这是一个非常古老的话题,但我来到这里寻找其他东西并找到了这个:

现在这个修正在大多数情况下都可以正常工作,但是当数据包的大小是缓冲区的倍数时它会失败。原因是如果缓冲区在读取时被填满,则假定为有更多的数据;但同样的问题发生在以前。例如,一个 2 字节缓冲区在 4 字节数据包上被填充两次,并假设有更多数据。然后它会阻塞,因为没有什么可读取的了。问题是接收函数不知道数据包何时结束。

我遇到了同样的问题,由于似乎没有一个回复可以解决这个问题,所以我使用Socket.Available

public static void Read_Callback(IAsyncResult ar)

    StateObject so = (StateObject) ar.AsyncState;
    Socket s = so.workSocket;

    int read = s.EndReceive(ar);    
    if (read > 0) 
    
        so.sb.Append(Encoding.ASCII.GetString(so.buffer, 0, read));

        if (s.Available == 0)
        
            // All data received, process it as you wish
        
    
    // Listen for more data
    s.BeginReceive(so.buffer, 0, StateObject.BUFFER_SIZE, 0, 
                new AyncCallback(Async_Send_Receive.Read_Callback), so);

希望这对其他人有所帮助,所以已经帮助了我很多次,谢谢大家!

【讨论】:

【参考方案2】:

我也遇到了同样的问题。

我测试了几次,发现有时候多个BeginReceive - EndReceive会导致丢包。 (此循环未正确结束)

就我而言,我使用了两种解决方案。

首先,我定义了足够的数据包大小,只做 1 次BeginReceive() ~ EndReceive();

其次,当我收到大量数据时,我使用NetworkStream.Read()而不是BeginReceive() - EndReceive()

异步socket不好用,需要对socket有很多了解。

【讨论】:

【参考方案3】:

这似乎有很多困惑。 MSDN 网站上使用 TCP 进行异步套接字通信的示例具有误导性,并且没有得到很好的解释。如果消息大小是接收缓冲区的整数倍,则 EndReceive 调用确实会阻塞。这将导致您永远不会收到您的消息并且应用程序挂起。

只是为了澄清 - 如果您使用 TCP,则必须为数据提供自己的分隔符。阅读以下内容(来自非常可靠的来源)。

对应用程序数据的需求 定界

TCP 处理的其他影响 作为流的传入数据是该数据 由应用程序使用 TCP 接收 是非结构化的。对于传输,一个 数据流在一个上进入 TCP 设备,并在接收时,流 数据返回到应用程序 接收设备。尽管 流被分成段 通过 TCP 传输,这些段 是隐藏的 TCP 级别的详细信息 从应用程序。所以,当一个 设备要发送多件 数据,TCP没有提供机制 指示“分界线”在哪里 是在碎片之间,因为 TCP 不检查的含义 数据。申请必须 提供一种方法来做到这一点。

以一个应用程序为例 即发送数据库记录。它 需要从 员工数据库表,后跟 记录#581 和记录#611。它发送 这些记录到 TCP,TCP 处理 它们都作为一股流 字节。 TCP 会打包这些字节 分段,但在某种程度上 应用程序无法预测。它是 有可能每个人都以 不同的细分市场,但更有可能 它们都将在一个段中,或者 每个的一部分将最终以不同的方式结束 段,取决于它们的长度。 记录本身必须有一些 某种明确的标记,所以 接收设备可以告诉你在哪里 记录结束,下一个开始。

来源:http://www.tcpipguide.com/free/t_TCPDataHandlingandProcessingStreamsSegmentsandSequ-3.htm

我在网上看到的大多数使用 EndReceive 的示例都是错误的或具有误导性的。在示例中它通常不会导致任何问题,因为只发送一条预定义的消息然后关闭连接。

【讨论】:

这来自哪里并不重要,这很幼稚。如果您正在实现基于 RFC 的基于 TCP 的协议处理程序,您将很少发现存在某种分隔符。相反,您通常必须依赖应用程序协议本身的内在长度和偏移量指标。【参考方案4】:

当。鉴于政要们已经参与进来,我什至都不愿意回答这个问题,但这里就可以了。温柔,伟大的人们!

如果没有阅读 Marc 的博客的好处(由于公司互联网政策,它在此处被屏蔽),我将提供“另一种方式”。

在我看来,诀窍是将数据的接收与数据的处理分开

我使用这样定义的 StateObject 类。它与 MSDN StateObject 实现的不同之处在于它不包含 StringBuilder 对象,BUFFER_SIZE 常量是私有的,并且为了方便,它包含了一个构造函数。

public class StateObject

    private const int BUFFER_SIZE = 65535;
    public byte[] Buffer = new byte[BUFFER_SIZE];
    public readonly Socket WorkSocket = null;

    public StateObject(Socket workSocket)
    
        WorkSocket = workSocket;
    

我还有一个 Packet 类,它只是一个缓冲区和时间戳的包装器。

public class Packet

    public readonly byte[] Buffer;
    public readonly DateTime Timestamp;

    public Packet(DateTime timestamp, byte[] buffer, int size)
    
        Timestamp = timestamp;
        Buffer = new byte[size];
        System.Buffer.BlockCopy(buffer, 0, Buffer, 0, size);
    

我的 ReceiveCallback() 函数如下所示。

public static ManualResetEvent PacketReceived = new ManualResetEvent(false);
public static List<Packet> PacketList = new List<Packet>();
public static object SyncRoot = new object();
public static void ReceiveCallback(IAsyncResult ar)

    try 
        StateObject so = (StateObject)ar.AsyncState;
        int read = so.WorkSocket.EndReceive(ar);

        if (read > 0) 
            Packet packet = new Packet(DateTime.Now, so.Buffer, read);
            lock (SyncRoot) 
                PacketList.Add(packet);
            
            PacketReceived.Set();
        

        so.WorkSocket.BeginReceive(so.Buffer, 0, so.Buffer.Length, 0, ReceiveCallback, so);
     catch (ObjectDisposedException) 
        // Handle the socket being closed with an async receive pending
     catch (Exception e) 
        // Handle all other exceptions
    

请注意,此实现绝对不处理接收到的数据,也不期望接收到多少字节。它只是简单地接收套接字上发生的任何数据(最多 65535 字节)并将该数据存储在数据包列表中,然后立即将另一个异步接收排队。

由于处理每个异步接收的线程不再进行处理,数据显然将由一个不同的线程处理,这就是为什么Add()操作是通过lock语句同步的。此外,处理线程(无论是主线程还是其他一些专用线程)需要知道何时有数据要处理。为此,我通常使用 ManualResetEvent,这就是我上面展示的内容。

这是处理的工作原理。

static void Main(string[] args)

    Thread t = new Thread(
        delegate() 
            List<Packet> packets;
            while (true) 
                PacketReceived.WaitOne();
                PacketReceived.Reset();
                lock (SyncRoot) 
                    packets = PacketList;
                    PacketList = new List<Packet>();
                

                foreach (Packet packet in packets) 
                    // Process the packet
                
            
        
    );
    t.IsBackground = true;
    t.Name = "Data Processing Thread";
    t.Start();

这是我用于所有套接字通信的基本基础设施。它在数据接收和数据处理之间提供了很好的分离。

关于您遇到的另一个问题,请务必记住,使用这种方法,每个 Packet 实例不一定代表您的应用程序上下文中的完整消息。一个 Packet 实例可能包含部分消息、一条消息或多条消息,并且您的消息可能跨越多个 Packet 实例。我已经解决了如何知道您何时收到您在here 发布的相关问题中的完整消息。

【讨论】:

您在正确的轨道上,但您还必须考虑到您的生产者将一个 整个 缓冲区放入列表中,以便完成每个接收。您必须考虑两种可能会毁掉您的情况:1) 缓慢的 WAN 网络涓流将在每个缓冲区中填充 10 个字节,并且您将在第一帧完成之前耗尽您的内存。和 2) 快速发送者压倒慢速接收者,因为接收者在处理之前接受了所有内容。对于 1),您需要合并低填充缓冲区。对于 2) 你需要 TCP 流控制,当 PacketList 变得太大时停止发布缓冲区。 错过了您的数据包仅复制了填充的缓冲区,因此我的反对意见 1) 不再适用(尽管现在我会反对副本...),但这仍然是 2)【参考方案5】:

有关信息(一般开始/结束用法),您可能想查看this blog post;这种方法对我来说效果很好,并且可以减轻很多痛苦...

【讨论】:

所以您在创建 TPL 之前已经构建了 Task.FromAsync。格拉茨!【参考方案6】:

您将首先阅读长度前缀。一旦你有了它,你就可以继续读取块中的字节(并且你可以做到这一点异步,正如你所推测的那样),直到你用尽了你知道的从网络中传入的字节数。

请注意,在某些时候,在读取最后一个块时,您不会想要读取完整的 1024 个字节,具体取决于长度前缀表示的总数以及您已读取的字节数。

【讨论】:

你真的确定最后一句话吗?看这个例子 - msdn.microsoft.com/en-us/library/dxkwh6zw.aspx 没有保护过度阅读。当读取返回 0 字节时,读取才会连接在一起。缓冲区大小为 const=1024。 @ryeguy:我说的完全不对。编辑得更清楚。【参考方案7】:

否 - 从回调处理程序再次调用 BeginReceive,直到 EndReceive 返回 0。基本上,您应该继续异步接收,假设您想要充分利用异步 IO。

如果您查看 Socket.BeginReceive 的 MSDN 页面,您会看到一个示例。 (诚​​然,这并不像想象的那么容易。)

【讨论】:

我尝试了这种方法,但在我的情况下,EndReceive 永远不会返回零字节。有什么理由吗? @sura: 也许网络连接的另一端从来没有关闭过连接? @JonSkeet 如果我有一个持久连接并且收到了部分消息,如果部分消息完成,EndReceive 是否能够返回 0? @FaizanRabbani:“部分消息”是什么意思?据我所知,EndReceive 只会在流的末尾返回 0。 @JonSkeet 部分消息,如果您认为应该传输 AZ 字母,但由于某些网络问题,A 到 M 被传输,然后过一段时间让我们说 100 毫秒 N 到 Z 被传输。跨度>

以上是关于C# Begin/EndReceive - 我如何读取大数据?的主要内容,如果未能解决你的问题,请参考以下文章

我如何在 webBrowser 中使用 c# 执行点击事件

如何在 C# 中编写我自己的包装器?

我如何提取 PayPal 响应 C#

如何解析 C# 的命令行输出?

如何在 C# 中计时执行 SQL 查询?

如何在 c++ 中调用 c# 方法?