StreamReader缓冲GZIP的流式HTTP?

Posted

技术标签:

【中文标题】StreamReader缓冲GZIP的流式HTTP?【英文标题】:Streamed HTTP with GZIP being buffered by StreamReader? 【发布时间】:2013-01-23 11:52:33 【问题描述】:

努力寻找遇到类似问题或类似问题的人。

我目前正在使用具有 GZip 要求的 http (json) 流,并且从发送数据到 reader.ReadLine() 读取它时遇到了延迟。有人向我建议,这可能与将数据保存在缓冲区中的解码有关?

这是我目前拥有的,除了延迟之外它工作正常。

HttpWebRequest request = (HttpWebRequest)WebRequest.Create(endPoint);
request.Method = "GET";

request.PreAuthenticate = true;
request.Credentials = new NetworkCredential(username, password);

request.AutomaticDecompression = DecompressionMethods.GZip;
request.ContentType = "application/json";
request.Accept = "application/json";
request.Timeout = 30;
request.BeginGetResponse(AsyncCallback, request);

然后在我的 AsyncCallback 方法中:

HttpWebRequest request = result.AsyncState as HttpWebRequest;

using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8))


    while (!reader.EndOfStream)
    
        string line = reader.ReadLine();
        if (string.IsNullOrWhiteSpace(line)) continue;

        Console.WriteLine(line);
    

它只是停留在reader.Readline() 上,直到接收到更多数据,然后甚至保留其中的一些数据。还收到了保持活动的换行符,当它决定读取某些内容时,这些通常会立即全部读出。

我已经测试了与 curl 命令并行运行的流,curl 命令可以很好地接收和解压缩数据。

任何见解都会很棒。 谢谢,

编辑 在流式阅读器上使用缓冲区大小没有运气。

new StreamReader(stream, Encoding.UTF8, true, 1)

编辑 也没有运气更新到 .NET 4.5 并使用

request.AllowReadStreamBuffering = false;

【问题讨论】:

嗯..为什么不使用reader.ReadToEnd() 这是一个 http 流,在很长一段时间内保持打开状态。所以我需要处理每一行。我相信 .ReadToEnd() 会等到 EndOfStream 收到?这不太可能发生。 啊,所以这是一种保持活动状态的连接,您可以在其中获得增量响应? @Dan - 试试request.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate; 和 lmk! @AppDeveloper 恐怕没有运气,但我知道它明确使用 gzip。我有一种感觉是延迟被引入的地方。 【参考方案1】:

更新:这似乎在长时间使用较高的音量时会出现问题,并且只应在缓冲区影响应用程序功能的小音量上使用。我已经切换回StreamReader

所以这就是我最终想出的。这有效,没有延迟。这不会被自动 GZip 解压缩缓冲。

using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
using (Stream stream = response.GetResponseStream())
using (MemoryStream memory = new MemoryStream())
using (GZipStream gzip = new GZipStream(memory, CompressionMode.Decompress))

    byte[] compressedBuffer = new byte[8192];
    byte[] uncompressedBuffer = new byte[8192];
    List<byte> output = new List<byte>();

    while (stream.CanRead)
    
        int readCount = stream.Read(compressedBuffer, 0, compressedBuffer.Length);

        memory.Write(compressedBuffer.Take(readCount).ToArray(), 0, readCount);
        memory.Position = 0;

        int uncompressedLength = gzip.Read(uncompressedBuffer, 0, uncompressedBuffer.Length);

        output.AddRange(uncompressedBuffer.Take(uncompressedLength));

        if (!output.Contains(0x0A)) continue;

        byte[] bytesToDecode = output.Take(output.LastIndexOf(0x0A) + 1).ToArray();
        string outputString = Encoding.UTF8.GetString(bytesToDecode);
        output.RemoveRange(0, bytesToDecode.Length);

        string[] lines = outputString.Split(new[]  Environment.NewLine , new StringSplitOptions());
        for (int i = 0; i < (lines.Length - 1); i++)
        
            Console.WriteLine(lines[i]);
        

        memory.SetLength(0);
    

【讨论】:

【参考方案2】:

C.Evenhuis 讨论的延迟 ACK 可能有些问题,但我有一种奇怪的直觉,感觉是 StreamReader 让你头疼……你可以试试这样:

public void AsyncCallback(IAsyncResult result)

    HttpWebRequest request = result.AsyncState as HttpWebRequest;   
    using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
    using (Stream stream = response.GetResponseStream())
    
        var buffer = new byte[2048];
        while(stream.CanRead)
        
            var readCount = stream.Read(buffer, 0, buffer.Length);
            var line = Encoding.UTF8.GetString(buffer.Take(readCount).ToArray());
            Console.WriteLine(line);
        
    

编辑:这是我用来测试这个理论的完整工具(也许与你的情况不同会突然出现)

(LINQPad 就绪)

void Main()

    Task.Factory.StartNew(() => Listener());
    _blocker.WaitOne();
    Request();


public bool _running;
public ManualResetEvent _blocker = new ManualResetEvent(false);

public void Listener()

    var listener = new HttpListener();
    listener.Prefixes.Add("http://localhost:8080/");
    listener.Start();
    "Listener is listening...".Dump();;
    _running = true;
    _blocker.Set();
    var ctx = listener.GetContext();
    "Listener got context".Dump();
    ctx.Response.KeepAlive = true;
    ctx.Response.ContentType = "application/json";
    var outputStream = ctx.Response.OutputStream;
    using(var zipStream = new GZipStream(outputStream, CompressionMode.Compress))
    using(var writer = new StreamWriter(outputStream))
    
        var lineCount = 0;
        while(_running && lineCount++ < 10)
        
            writer.WriteLine(" \"foo\": \"bar\"");
            "Listener wrote line, taking a nap...".Dump();
            writer.Flush();
            Thread.Sleep(1000);
        
    
    listener.Stop();


public void Request()

    var endPoint = "http://localhost:8080";
    var username = "";
    var password = "";
    HttpWebRequest request = (HttpWebRequest)WebRequest.Create(endPoint);
    request.Method = "GET";

    request.PreAuthenticate = true;
    request.Credentials = new NetworkCredential(username, password);

    request.AutomaticDecompression = DecompressionMethods.GZip;
    request.ContentType = "application/json";
    request.Accept = "application/json";
    request.Timeout = 30;
    request.BeginGetResponse(AsyncCallback, request);


public void AsyncCallback(IAsyncResult result)

    Console.WriteLine("In AsyncCallback");    
    HttpWebRequest request = result.AsyncState as HttpWebRequest;    
    using (HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result))
    using (Stream stream = response.GetResponseStream())
    
        while(stream.CanRead)
        
            var buffer = new byte[2048];
            var readCount = stream.Read(buffer, 0, buffer.Length);
            var line = Encoding.UTF8.GetString(buffer.Take(readCount).ToArray());
            Console.WriteLine("Reader got:" + line);
        
    

输出:

Listener is listening...
Listener got context
Listener wrote line, taking a nap...
In AsyncCallback
Reader got: "foo": "bar"

Listener wrote line, taking a nap...
Reader got: "foo": "bar"

Listener wrote line, taking a nap...
Reader got: "foo": "bar"

Listener wrote line, taking a nap...
Reader got: "foo": "bar"

Listener wrote line, taking a nap...
Reader got: "foo": "bar"

Listener wrote line, taking a nap...
Reader got: "foo": "bar"

【讨论】:

谢谢,但运气不好。同样的行为,当线路通过时,其中有多个已等待的数据位。我认为问题出在某个地方的 gzip。 嗯...奇怪,我拼凑了一个快速的安全带来测试它,并看到了我认为你所追求的...让我将完整的安全带附加到我的回答。 这实际上非常有用,它复制了我遇到的问题。对您的代码进行两次小的更改来修复它。将 GZipStream 传递给 writer。并添加 ctx.Response.AddHeader("Content-Encoding", "gzip"); 哈 - 是的,这两个变化会有所不同...... :) 让我想想,如果我想出任何东西,我会增加这个答案。 这是使用您的侦听器和您的代码的输出。 screencast.com/t/Po5WbK4eVw1【参考方案3】:

这可能与 Delayed ACK 结合 Nagle 算法有关。当服务器连续发送多个小响应时会发生这种情况。

在服务器端,发送第一个响应,但只有在服务器收到来自客户端的 ACK 后,或者直到有足够的数据可以发送大数据包(Nagle 算法)时,才会发送后续响应数据块。

在客户端,收到响应的第一位,但不会立即发送 ACK - 由于传统应用程序具有 request-response-request-response 行为,它假设它可以将 ACK 与下一个请求一起发送- 在你的情况下不会发生。

在一段固定的时间(500 毫秒?)之后,它决定发送 ACK,导致服务器发送它迄今为止累积的下一个包。

问题(如果这确实是您遇到的问题)可以通过设置NoDelay 属性在服务器端在套接字级别修复,禁用 Nagle 算法。我认为您也可以在整个操作系统范围内禁用它。

您还可以在客户端暂时禁用延迟 ACK(我知道 windows 有一个注册表项)以查看这是否确实是问题所在,而无需更改服务器上的任何内容。延迟 ACK 可防止 DDOS 攻击,因此请确保事后恢复设置。

不那么频繁地发送 keepalive 也可能会有所帮助,但您仍有可能出现问题。

【讨论】:

感谢您的回复,但这似乎不是问题所在。我也无权访问服务器。该服务器已被证明适用于其他语言,并且我在 curl 中运行连接没有任何问题,它可以接收每一行。在本地禁用 Nagle 没有任何效果。不过,我将暂时研究一种没有 HttpWebRequest 的方法。 如果这是问题所在,则可能是服务器的 Nagle 导致了问题。可惜我帮不了你,祝你好运。

以上是关于StreamReader缓冲GZIP的流式HTTP?的主要内容,如果未能解决你的问题,请参考以下文章

如何将对象流式传输到压缩的 json?

C# 只读第一行,使用压缩文本文件的 StreamReader

在 Java 中连接重置后恢复流式传输 GZIP 文件

如何实时流式传输音频文件

Nginx:17---反向代理之(反向代理服务器的性能调优:缓冲数据缓存数据存储数据压缩数据(gzip模块))

删除不在 BigQuery 流式缓冲区中的行