使用 Express 将 BigQuery 流式传输到前端

Posted

技术标签:

【中文标题】使用 Express 将 BigQuery 流式传输到前端【英文标题】:BigQuery stream to front-end with Express 【发布时间】:2019-10-25 03:56:54 【问题描述】:

我正在尝试从BigQuery 读取查询并将其流式传输到前端。在带有 Express 的 Node.js-land 中,这将是:

app.get('/endpoint', (req, res) => 
  bigQuery.createQueryStream(query).pipe(res);
);

但是,createQueryStream() 确实创建 Node.js 流,而是返回 表行 的自定义流对象,因此它失败了:

(node:21236) UnhandledPromiseRejectionWarning: TypeError [ERR_INVALID_ARG_TYPE]: 第一个参数必须是字符串或缓冲区类型之一。接收到的类型对象

这在official documentation中得到了证实:

bigquery.createQueryStream(query)
  .on('data', function(row) 
    // row is a result from your query.
  )

那么,有没有办法将 BigQuery 数据流式传输到前端?我想到了两种可能的解决方案,但想知道是否有人知道更好的方法:

JSON.stringify() 行并返回 JSONL 而不是普通的 JSON。这增加了解码它的前端负担,但在双方都相当容易。 移至REST API 并使用request 进行实际流式传输,例如:request(url, body: query, params ).pipe(res)(或任何特定的API,还没有在那里挖掘)。

我很困惑,一个说它可以流式传输的 Node.js 库不适用于 Node.js 原生流,但情况似乎是这样。

【问题讨论】:

你找到解决办法了吗? 亲切!我们最终手动构建了一个 JSON,然后以一种非常老套的方式“流式传输”每一行。但是我们试图流式传输大量数据,而 BigQuery 仍然很慢。我们在某个时候(出于其他原因!)转向了 Python,并希望那里的库会更成熟,但不,它实际上差很多 @tjbandes 这就是我们最终要做的:***.com/a/64746763/12271991 我明白了,感谢您的更新!似乎将结果与[ , ] 连接起来有点违背了流媒体的目的,对吧?我想您正在服务器端节省内存,但客户端仍然需要等待整个数组才能JSON.parse 是的,完全同意。我什至不确定我们是否在服务器端节省内存,但我们可能会。另一个小优点是数据以块的形式发送,因此与等待和一次性发送数据相比在最后一个块到达服务器后(较低的后端前端带宽需要)。 【参考方案1】:

BigQuery 旨在与为不同编程语言编写的大量不同客户端库一起使用,因此,它不返回特定于 nodejs 的数据结构,而是返回更通用的结构,这些结构几乎适用于任何结构化编程语言,如物体。回答您的问题,是的,有一种将 BigQuery 数据流式传输到前端的方法,但这是一个相当个人的选择,因为它所需要的只是从一种数据类型转换为另一种数据类型。但是,我想说最直接的方法是致电JSON.stringify(),您已经提到过。

希望对你有帮助。

【讨论】:

当然,但这是一个 Node.js 库,它说它可以流式传输,所以我认为 it does Node.js streaming 是非常安全的。那么这里的命名很混乱,感谢您的帮助! @Francisco 它执行 Node.js 流,你是对的,但它返回的流是 Node.js 中的标准数据类型。您共享的文档说返回的对象是可读的流对象,因此您可以通过多种方式将其转换为字符串。其中之一就是library 哦,对不起,我的意思是我认为它会做 Node.js 流,但 它不会,你不能 .pipe() 直接到 Node.js ReadableStream 因为它'会抛出错误。所以它不是一个 Node.js 流兼容库。这就是我感到困惑的原因,BigQuery 似乎不支持 Node.js 流,它只支持看起来相似但工作方式不同的东西。【参考方案2】:

我们最终完成了一个实现,将 BigQuery 的回复拼接到一个大的 JSON 数组中:

exports.stream = (query, params, res) => 
  // Light testing for descriptive errors in the parameters
  for (let key in params) 
    if (typeof params[key] === "number" && isNaN(params[key])) 
      throw new TypeError(`The parameter "$key" should be a number`);
    
  

  return new Promise((resolve, reject) => 
    let prev = false;

    const onData = row => 
      try 
        // Only handle it when there's a row
        if (!row) return;

        // There was a previous row written before, so add a comma
        if (prev) 
          res.write(",");
        
        res.write(stringify(row));
        prev = true;
       catch (error) 
        console.error("Cannot parse row:", error);
        // Just ignore it, don't write this frame
      
    ;

    const onEnd = () => 
      res.write("]");
      res.end();
      resolve();
    ;

    res.writeHead(200,  "Content-Type": "application/json" );
    res.write("[");
    bigQuery
      .createQueryStream( query, params )
      .on("error", reject)
      .on("data", onData)
      .on("end", onEnd);
  );
;

它将通过拼接构建一个大型 JSON 数组:

[   // <- First character sent
stringify(row1)   // <- First row
,   // <- add comma on second row iteration
stringify(row2)   // <- Second row
...
stringify(rowN)   // <- Last row
]   // <- Send the "]" character to close the array

这样有好处:

数据会尽快发送,因此带宽需求较低。 (取决于 BigQuery 实现)服务器端的内存需求较低,因为并非所有数据都同时保存在内存中,只有小块。

【讨论】:

以上是关于使用 Express 将 BigQuery 流式传输到前端的主要内容,如果未能解决你的问题,请参考以下文章

使用 Java 将 JSON 流式传输到 BigQuery

将 JSON 流式传输到 Bigquery

Google BigQuery - 将数据流式传输到 BigQuery

使用java将json数据流式传输到Bigquery中。不使用作业加载数据

使用 GET 方法将数据流式传输到 Google BigQuery?

使用现有架构将表数据从一个 BigQuery 表流式传输到另一个