使用 Fetch Streams API 异步消费分块数据而不使用递归
Posted
技术标签:
【中文标题】使用 Fetch Streams API 异步消费分块数据而不使用递归【英文标题】:Use Fetch Streams API to consume chunked data asynchronously without using recursion 【发布时间】:2019-03-17 05:44:15 【问题描述】:我正在使用 javascript fetch
streams API 来异步使用分块的 JSON,就像在 this answer 中一样。
我的应用程序可能在一小时内每秒接收多达 25 个小型 JSON 对象(视频中的每一帧一个)。
当传入的块很大(每个块 1000 多个 JSON 对象)时,我的代码运行良好 - 速度快,内存使用量最少 - 它可以轻松可靠地接收 1,000,000 个 JSON 对象。
当传入的块较小(每个块 5 个 JSON 对象)时,我的代码运行不佳 - 速度慢,内存消耗大。浏览器在大约 50,000 个 JSON 对象时死掉。
在开发者工具中进行大量调试后,问题似乎在于代码的递归性质。
我试图删除递归,但它似乎是必需的,因为 API 依赖于我的代码返回对链的承诺?!
如何移除这个递归,或者我应该使用fetch
以外的东西?
递归代码(有效)
String.prototype.replaceAll = function(search, replacement)
var target = this;
return target.replace(new RegExp(search, 'g'), replacement);
;
results = []
fetch('http://localhost:9999/').then(response =>
const reader = response.body.getReader();
td = new TextDecoder("utf-8");
buffer = "";
reader.read().then(function processText( done, value )
if (done)
console.log("Stream done.");
return;
try
decoded = td.decode(value);
buffer += decoded;
if (decoded.length != 65536)
toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
result = JSON.parse(toParse);
results.push(...result);
console.log("Received " + results.length.toString() + " objects")
buffer = "";
catch(e)
// Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
//console.log("EXCEPTION:"+e);
return reader.read().then(processText);
)
);
没有递归的代码(不起作用)
String.prototype.replaceAll = function(search, replacement)
var target = this;
return target.replace(new RegExp(search, 'g'), replacement);
;
results = []
finished = false
fetch('http://localhost:9999/').then(response =>
const reader = response.body.getReader();
td = new TextDecoder("utf-8");
buffer = "";
lastResultSize = -1
while (!finished)
if (lastResultSize < results.length)
lastResultSize = results.length;
reader.read().then(function processText( done, value )
if (done)
console.log("Stream done.");
finished = true;
return;
else
try
decoded = td.decode(value);
//console.log("Received chunk " + decoded.length.toString() + " in length");
buffer += decoded;
if (decoded.length != 65536)
toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
result = JSON.parse(toParse);
results.push(...result);
console.log("Received " + results.length.toString() + " objects")
buffer = "";
//console.log("Parsed chunk " + toParse.length.toString() + " in length");
catch(e)
// Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
//console.log("EXCEPTION:"+e);
)
);
为了完整起见,这里是我在测试服务器上使用的 python 代码。注意包含 sleep 的行,它改变了分块行为:
import io
import urllib
import inspect
from http.server import HTTPServer,BaseHTTPRequestHandler
from time import sleep
class TestServer(BaseHTTPRequestHandler):
def do_GET(self):
args = urllib.parse.parse_qs(self.path[2:])
args = i:args[i][0] for i in args
response = ''
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Transfer-Encoding', 'chunked')
self.end_headers()
for i in range (1000000):
self.wfile.write(bytes(f'"x":i, "text":"fred!"\n','utf-8'))
sleep(0.001) # Comment this out for bigger chunks sent to the client!
def main(server_port:"Port to serve on."=9999,server_address:"Local server name."=''):
httpd = HTTPServer((server_address, server_port), TestServer)
print(f'Serving on http://httpd.server_name:httpd.server_port ...')
httpd.serve_forever()
if __name__ == '__main__':
main()
【问题讨论】:
您是否在其他浏览器中尝试过此操作,是否在其他浏览器中观察到相同的行为? 我在 Firefox 和 Chrome 中都试过这个。它们的行为相似,尽管具有不同的性能配置文件 - 例如。 Firefox 运行时间更长但速度更慢,Chrome 运行速度更快但接收的 JavaScript 对象更少。 可能值得提交浏览器错误,以从他们那里获得有关其实现的见解,以及需求的性质是否以某种方式限制了它们,以便更好地针对这种情况进行优化。最坏的情况是,回复说他们知道这对开发人员来说可能是一个问题,但他们不打算改变他们的实现。最好的情况是,他们同意这是一个应该可行的场景,并且他们更改了他们的实现,这样浏览器就不会最终耗尽内存并在该场景中崩溃。 我不确定浏览器本身是问题的根源 - 在浏览器中,保守深度的递归仍然可以。我主要担心的是我看不到如何实现该算法的迭代版本,而且看起来 Fetch Streams API 确实是在假设递归为最佳情况的情况下编写的,而在我的情况下,情况并非如此。由于我在这里没有太多活动,我可能会尝试 Streams API 工作组 (github.com/whatwg/streams) 并将它们指向这里。感谢您的质量保证和想法 :) 【参考方案1】:您缺少的部分是传递给.then()
的函数始终是异步调用的,即使用空堆栈。所以这里没有实际的递归。这也是为什么您的“无递归”版本不起作用的原因。
解决这个问题的简单方法是使用异步函数和 await 语句。如果你像这样调用 read():
const value, done = await reader.read();
...然后您可以循环调用它,它会按照您的预期工作。
我不知道您的内存泄漏具体在哪里,但是您对全局变量的使用似乎是个问题。我建议您始终将'use strict';
放在代码的顶部,以便编译器为您捕获这些问题。然后在声明变量时使用let
或const
。
我建议您使用TextDecoderStream 来避免字符在多个块之间拆分时出现问题。当 JSON 对象在多个块之间拆分时,您也会遇到问题。
请参阅 Append child writable stream demo 了解如何安全地执行此操作(但请注意,您需要 TextDecoderStream,该演示具有“TextDecoder”)。
还要注意该演示中使用了 WritableStream。 Firefox 还不支持它 AFAIK,但 WritableStream 提供了更简单的语法来使用块,而无需显式循环或递归。你可以找到 web 流 polyfill here。
【讨论】:
感谢您的详尽回答。最后,罪魁祸首实际上是 Chrome/Firefox 调试器本身!!我将Console.Log
更改为 document.write
以进行调试输出,关闭调试器,第一个代码示例在大块和小块上都可以正常工作。我的代码确实已经处理了拆分字符/JSON,虽然不是很优雅。所以我按照建议重新编写了:pastebin.com/hWGtQZvA,但它的性能不如我的原始代码(令人惊讶)。因此,最快的 100% 本机代码最终成为 pastebin.com/Dr1CQVa2,我现在将进一步完善它。谢谢!
pipeThrough() 尚未优化,因此 read() 仍然提供最佳性能,正如您所发现的。
有没有办法识别数据何时被分成不同的块? (在他的情况下,您建议使用 TextDecoderStream,但在我的情况下,我收到的字节数有时会分成不同的块......)
要识别一个字符何时被分成多个块,我认为您需要自己解码这些字符。如果输入的是UTF-8,这并不算太难,但当然会很慢。以上是关于使用 Fetch Streams API 异步消费分块数据而不使用递归的主要内容,如果未能解决你的问题,请参考以下文章
用于事件过滤的 Kafka Consumer API 与 Streams API
使用 fetch 在节点中进行异步 api 调用的最佳实践?