将 Node.js 流错误事件传播到异步等待样式代码

Posted

技术标签:

【中文标题】将 Node.js 流错误事件传播到异步等待样式代码【英文标题】:Propagating Node.js stream error events to async await style code 【发布时间】:2021-11-09 03:11:00 【问题描述】:

我使用可读和转换流,稍后我使用for await 消费。

我找不到处理被调用者的流错误的方法,因此它们可以在调用者函数中被捕获。

例如如果transform 抛出,它会导致未捕获的错误。

如果我将错误侦听器添加到transform,自然不会将错误传播到 main 的 catch。

function handler(err) 
  // Log the error here
  // If throw from here it won't be cauch in main
  // How do I propagate it to main?


function getStream() 
  // transform1 and transform2 are custom transform streams
  readable.pipe(csvParser).pipe(transform1).pipe(transform2);
  readable.on('error', handler);
  csvParser.on('error', handler);
  transform1.on('error', handler);
  transform2.on('error', handler);
  return transform2;


async function main() 
  try 
    const stream = getStream();
    for await(const chunk of stream) 
      // process chunk
    
   catch (ex) 
    // how to catch transform errors here?
  

有什么办法吗?

【问题讨论】:

您从getStream 函数中从哪里获得transform @h-sifat 我自己写的自定义转换流 【参考方案1】:

据我了解,async/await 在很大程度上是 promise 之上的语法糖。具体来说,我认为由于 ReadableStream.pipe 遵循基于事件(on("error", errorHandler))而不是基于承诺的模式,try ... await ... catch (ex) ... 构造不会像您可能那样无缝地处理 ReadableStream.pipe 中“抛出”的异步错误希望。

假设我的理解是正确的(老实说,我不经常使用 aysnc/await 语法,所以我也可能遗漏了一些东西)一种简单的解决方法是退回到添加基于回调的像这样的事件处理程序:

function handleError(err)  ... 

readable.once("error", handleError);

// and if you want, re-use that handler within your catch block, like:
// ` catch (ex)  handleError(ex); `
// but I'm not sure how often that will come up if you follow this pattern

但您可能已经意识到这一点。

如果你真的想在这种情况下使用 aysnc/await/try/catch 风格的构造,你可以使用 util.promisify 之类的东西来将 ReadableStream 使用的基于on("error", handler)-event-based API 转换为承诺。这篇 *** 帖子 - How to use Async await using util promisify? - 更深入地涵盖了该主题,但 (IMO) 似乎需要跳过很多圈才能避免添加 readable.once("error" /* ...whatever you'd otherwise have in your catch block ... */)

简而言之,我认为因为 ReadableStream.pipe 不是围绕 Promise 设计的,所以 async/await 语法(本身)不足以确保可能作为错误事件发出的异步错误是被 try/catch 块困住。您需要以老式方式直接处理这些错误(通过为readable 发出的错误事件注册一个处理程序,在这种情况下等待和尝试/捕获的东西不直接适用)或“间接"(通过创建一个适配器,使这些发出的事件像已解决的 promise 上的 catch 情况一样冒泡,在这种情况下,您可以使用 async/await 使其看起来像同步风格的 try/catch)。

【讨论】:

这一切都是正确的,但假设在高级别上要向用户显示它需要抛出的错误,我可以通过添加一个 log 错误>error 侦听器,但我无法将其抛出并显示给用户。那是我的问题。 好吧,我很确定您可以使用 promisify 包装错误处理程序,以允许 async/await/try/catch 关键字按照原始帖子中的暗示工作(如上面的 SO 链接中所述) 或者,如果您的 onError 处理程序类似于function onError(err) throw err; ,那是否不足以让错误冒泡到通用的“未处理异常”处理程序?更一般地说,如果您不依赖默认的未处理案例,您可以从 onError 处理程序调用任何使用的报告机制,就像您在 catch 块中所做的那样。 不,这还不够。 onError 处理程序中引发的错误不会冒泡到 main。我可以将整个 getStream 包装成一个承诺,但我不能等待它。因为那样执行就会停止。我需要开始执行for await 循环以触发转换错误。这意味着我不能等待 getStream 返回的承诺,因为它永远不会解决。【参考方案2】:

我认为转换流函数不应该抛出异常。相反,它应该发出错误事件或将错误传递给回调函数。

这是一个例子。

编辑

使用 try catch 块将所有内容包装在 transform 方法中。并传播到主函数。

const  Transform  = require("stream");

//Added on 1st edit
function handler(ex) 
  console.error("Logged By Handler: ", ex);


function getStream() 
  // A simple transform stream to test
  const transform = new Transform(
    transform(chunk, encoding, callback) 
      try 
        chunk = chunk.toString();

        // if chunk == "err" then we want to throw an error
        // to simulate a real life error
        if (chunk === "err\n") 
        return callback("Oops! Sth failed in the transform stream.", null);
        // or this.emit("error", "Oops! Sth failed in the transform stream.");

        this.push(chunk.toUpperCase());

        // Simulating exception
        throw new Error(`Fatal error.`);
        callback();
       catch (ex) 
        handler(ex);
        callback(ex, null);
      
    ,
  );

  process.stdin.pipe(transform);
  // readable    ->   transform
  return transform;


async function main(transform) 
  try 
    for await (const chunk of transform) process.stdout.write(chunk);
   catch (ex) 
    console.error("Handled by main:", ex);
  


main(getStream());

【讨论】:

如果转换中出现错误,它将在转换时作为error 事件发出。这并不能解决我在 main 中捕获它的问题。我可以添加error 事件侦听器,但我无法将错误从那里传播到主错误处理程序。也可以以相同的方式从csvParser 流中发出错误。我还可以添加一个error listenere。但我也无法在主错误处理程序中捕获它。 I also cannot catch it in main error handler 是什么意思?在我的代码中,它在 main 函数中捕获了错误。但是堆栈跟踪丢失了,我不知道为什么! 哦,等你更新了你的代码!让我检查 嘿@krl 我已经按照你的代码模式添加了一个像你一样的错误处理程序。但它仍然传播到主函数! 如果我扔进任何on('error'),Node.js 崩溃会导致它最终未被处理。【参考方案3】:

我最终用Promise.race解决了这个问题:

function readStream(stream) 
  return Promise.race([
    new Promise((_, reject) => stream.once('error', reject)),
    iterate(stream)
  ]);


async function iterate(stream) 
  for await (const object of stream) 
    // ...
  

这将拒绝流或 iterate 函数中的任何错误,或以 iterate 的结果解决,以先到者为准。

【讨论】:

以上是关于将 Node.js 流错误事件传播到异步等待样式代码的主要内容,如果未能解决你的问题,请参考以下文章

Node.js/Mongoose 等待异步 For 循环

如何编写一个在“返回”之前等待事件触发的 node.js 函数?

嵌套 Promise 不会将错误传播到 Node.js 中的父 Promise?

一文搞懂Node.js以及浏览器中的事件循环机制

Node.js 异步等待快递

Node.js mongodb 驱动程序异步/等待查询