使用 Fetch Streams API 异步消费分块数据而不使用递归

Posted

技术标签:

【中文标题】使用 Fetch Streams API 异步消费分块数据而不使用递归【英文标题】:Use Fetch Streams API to consume chunked data asynchronously without using recursion 【发布时间】:2019-03-17 05:44:15 【问题描述】:

我正在使用 javascript fetch streams API 来异步使用分块的 JSON,就像在 this answer 中一样。

我的应用程序可能在一小时内每秒接收多达 25 个小型 JSON 对象(视频中的每一帧一个)。

当传入的块很大(每个块 1000 多个 JSON 对象)时,我的代码运行良好 - 速度快,内存使用量最少 - 它可以轻松可靠地接收 1,000,000 个 JSON 对象。

当传入的块较小(每个块 5 个 JSON 对象)时,我的代码运行不佳 - 速度慢,内存消耗大。浏览器在大约 50,000 个 JSON 对象时死掉。

在开发者工具中进行大量调试后,问题似乎在于代码的递归性质。

我试图删除递归,但它似乎是必需的,因为 API 依赖于我的代码返回对链的承诺?!

如何移除这个递归,或者我应该使用fetch以外的东西?


递归代码(有效)

String.prototype.replaceAll = function(search, replacement) 
    var target = this;
    return target.replace(new RegExp(search, 'g'), replacement);
;

results = []

fetch('http://localhost:9999/').then(response => 
    const reader = response.body.getReader();
    td = new TextDecoder("utf-8");
    buffer = "";

    reader.read().then(function processText( done, value ) 
        if (done) 
          console.log("Stream done.");
          return;
        

        try 
            decoded = td.decode(value);
            buffer += decoded;
            if (decoded.length != 65536)
                toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
                result = JSON.parse(toParse);
                results.push(...result);
                console.log("Received " + results.length.toString() + " objects")
                buffer = "";
            
        
        catch(e)
            // Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
            //console.log("EXCEPTION:"+e);
        

        return reader.read().then(processText);
    )
);

没有递归的代码(不起作用)

String.prototype.replaceAll = function(search, replacement) 
    var target = this;
    return target.replace(new RegExp(search, 'g'), replacement);
;

results = []
finished = false

fetch('http://localhost:9999/').then(response => 
    const reader = response.body.getReader();
    td = new TextDecoder("utf-8");
    buffer = "";
    lastResultSize = -1

    while (!finished)
        if (lastResultSize < results.length)
        
            lastResultSize = results.length;
            reader.read().then(function processText( done, value ) 

                if (done) 
                  console.log("Stream done.");
                  finished = true;
                  return;
                
                else
                    try 
                        decoded = td.decode(value);
                        //console.log("Received chunk " + decoded.length.toString() + " in length");
                        buffer += decoded;
                        if (decoded.length != 65536)
                            toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
                            result = JSON.parse(toParse);
                            results.push(...result);
                            console.log("Received " + results.length.toString() + " objects")
                            buffer = "";
                            //console.log("Parsed chunk " + toParse.length.toString() + " in length");
                        
                    
                    catch(e) 
                        // Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
                        //console.log("EXCEPTION:"+e);
                    
            )
        
);

为了完整起见,这里是我在测试服务器上使用的 python 代码。注意包含 sleep 的行,它改变了分块行为:

import io
import urllib
import inspect
from http.server import HTTPServer,BaseHTTPRequestHandler
from time import sleep


class TestServer(BaseHTTPRequestHandler):

    def do_GET(self):
        args = urllib.parse.parse_qs(self.path[2:])
        args = i:args[i][0] for i in args
        response = ''

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Transfer-Encoding', 'chunked')
        self.end_headers()

        for i in range (1000000):
            self.wfile.write(bytes(f'"x":i, "text":"fred!"\n','utf-8'))
            sleep(0.001)  # Comment this out for bigger chunks sent to the client!

def main(server_port:"Port to serve on."=9999,server_address:"Local server name."=''):
    httpd = HTTPServer((server_address, server_port), TestServer)
    print(f'Serving on http://httpd.server_name:httpd.server_port ...')
    httpd.serve_forever()


if __name__ == '__main__':
    main()

【问题讨论】:

您是否在其他浏览器中尝试过此操作,是否在其他浏览器中观察到相同的行为? 我在 Firefox 和 Chrome 中都试过这个。它们的行为相似,尽管具有不同的性能配置文件 - 例如。 Firefox 运行时间更长但速度更慢,Chrome 运行速度更快但接收的 JavaScript 对象更少。 可能值得提交浏览器错误,以从他们那里获得有关其实现的见解,以及需求的性质是否以某种方式限制了它们,以便更好地针对这种情况进行优化。最坏的情况是,回复说他们知道这对开发人员来说可能是一个问题,但他们不打算改变他们的实现。最好的情况是,他们同意这是一个应该可行的场景,并且他们更改了他们的实现,这样浏览器就不会最终耗尽内存并在该场景中崩溃。 我不确定浏览器本身是问题的根源 - 在浏览器中,保守深度的递归仍然可以。我主要担心的是我看不到如何实现该算法的迭代版本,而且看起来 Fetch Streams API 确实是在假设递归为最佳情况的情况下编写的,而在我的情况下,情况并非如此。由于我在这里没有太多活动,我可能会尝试 Streams API 工作组 (github.com/whatwg/streams) 并将它们指向这里。感谢您的质量保证和想法 :) 【参考方案1】:

您缺少的部分是传递给.then() 的函数始终是异步调用的,即使用空堆栈。所以这里没有实际的递归。这也是为什么您的“无递归”版本不起作用的原因。

解决这个问题的简单方法是使用异步函数和 await 语句。如果你像这样调用 read():

const value, done = await reader.read();

...然后您可以循环调用它,它会按照您的预期工作。

我不知道您的内存泄漏具体在哪里,但是您对全局变量的使用似乎是个问题。我建议您始终将'use strict'; 放在代码的顶部,以便编译器为您捕获这些问题。然后在声明变量时使用letconst

我建议您使用TextDecoderStream 来避免字符在多个块之间拆分时出现问题。当 JSON 对象在多个块之间拆分时,您也会遇到问题。

请参阅 Append child writable stream demo 了解如何安全地执行此操作(但请注意,您需要 TextDecoderStream,该演示具有“TextDecoder”)。

还要注意该演示中使用了 WritableStream。 Firefox 还不支持它 AFAIK,但 WritableStream 提供了更简单的语法来使用块,而无需显式循环或递归。你可以找到 web 流 polyfill here。

【讨论】:

感谢您的详尽回答。最后,罪魁祸首实际上是 Chrome/Firefox 调试器本身!!我将 Console.Log 更改为 document.write 以进行调试输出,关闭调试器,第一个代码示例在大块和小块上都可以正常工作。我的代码确实已经处理了拆分字符/JSON,虽然不是很优雅。所以我按照建议重新编写了:pastebin.com/hWGtQZvA,但它的性能不如我的原始代码(令人惊讶)。因此,最快的 100% 本机代码最终成为 pastebin.com/Dr1CQVa2,我现在将进一步完善它。谢谢! pipeThrough() 尚未优化,因此 read() 仍然提供最佳性能,正如您所发现的。 有没有办法识别数据何时被分成不同的块? (在他的情况下,您建议使用 TextDecoderStream,但在我的情况下,我收到的字节数有时会分成不同的块......) 要识别一个字符何时被分成多个块,我认为您需要自己解码这些字符。如果输入的是UTF-8,这并不算太难,但当然会很慢。

以上是关于使用 Fetch Streams API 异步消费分块数据而不使用递归的主要内容,如果未能解决你的问题,请参考以下文章

用于事件过滤的 Kafka Consumer API 与 Streams API

使用 fetch 在节点中进行异步 api 调用的最佳实践?

如何在变量Fetch API中存储异步数据

Redis5新特性Streams作消息队列

JS fetch API:如何使用一个异步函数从多个文件中获取内容?

使用fetch()异步请求API数据实现汇率转换器