使用 Express 将 BigQuery 流式传输到前端
Posted
技术标签:
【中文标题】使用 Express 将 BigQuery 流式传输到前端【英文标题】:BigQuery stream to front-end with Express 【发布时间】:2019-10-25 03:56:54 【问题描述】:我正在尝试从BigQuery 读取查询并将其流式传输到前端。在带有 Express 的 Node.js-land 中,这将是:
app.get('/endpoint', (req, res) =>
bigQuery.createQueryStream(query).pipe(res);
);
但是,createQueryStream()
确实不创建 Node.js 流,而是返回 表行 的自定义流对象,因此它失败了:
(node:21236) UnhandledPromiseRejectionWarning: TypeError [ERR_INVALID_ARG_TYPE]: 第一个参数必须是字符串或缓冲区类型之一。接收到的类型对象
这在official documentation中得到了证实:
bigquery.createQueryStream(query)
.on('data', function(row)
// row is a result from your query.
)
那么,有没有办法将 BigQuery 数据流式传输到前端?我想到了两种可能的解决方案,但想知道是否有人知道更好的方法:
JSON.stringify()
行并返回 JSONL
而不是普通的 JSON
。这增加了解码它的前端负担,但在双方都相当容易。
移至REST API 并使用request 进行实际流式传输,例如:request(url, body: query, params ).pipe(res)
(或任何特定的API,还没有在那里挖掘)。
我很困惑,一个说它可以流式传输的 Node.js 库不适用于 Node.js 原生流,但情况似乎是这样。
【问题讨论】:
你找到解决办法了吗? 亲切!我们最终手动构建了一个 JSON,然后以一种非常老套的方式“流式传输”每一行。但是我们试图流式传输大量数据,而 BigQuery 仍然很慢。我们在某个时候(出于其他原因!)转向了 Python,并希望那里的库会更成熟,但不,它实际上差很多。 @tjbandes 这就是我们最终要做的:***.com/a/64746763/12271991 我明白了,感谢您的更新!似乎将结果与[
,
]
连接起来有点违背了流媒体的目的,对吧?我想您正在服务器端节省内存,但客户端仍然需要等待整个数组才能JSON.parse
。
是的,完全同意。我什至不确定我们是否在服务器端节省内存,但我们可能会。另一个小优点是数据以块的形式发送,因此与等待和一次性发送数据相比在最后一个块到达服务器后(较低的后端前端带宽需要)。
【参考方案1】:
BigQuery 旨在与为不同编程语言编写的大量不同客户端库一起使用,因此,它不返回特定于 nodejs 的数据结构,而是返回更通用的结构,这些结构几乎适用于任何结构化编程语言,如物体。回答您的问题,是的,有一种将 BigQuery 数据流式传输到前端的方法,但这是一个相当个人的选择,因为它所需要的只是从一种数据类型转换为另一种数据类型。但是,我想说最直接的方法是致电JSON.stringify()
,您已经提到过。
希望对你有帮助。
【讨论】:
当然,但这是一个 Node.js 库,它说它可以流式传输,所以我认为 it does Node.js streaming 是非常安全的。那么这里的命名很混乱,感谢您的帮助! @Francisco 它执行 Node.js 流,你是对的,但它返回的流是 Node.js 中的标准数据类型。您共享的文档说返回的对象是可读的流对象,因此您可以通过多种方式将其转换为字符串。其中之一就是library 哦,对不起,我的意思是我认为它会做 Node.js 流,但 它不会,你不能.pipe()
直接到 Node.js ReadableStream 因为它'会抛出错误。所以它不是一个 Node.js 流兼容库。这就是我感到困惑的原因,BigQuery 似乎不支持 Node.js 流,它只支持看起来相似但工作方式不同的东西。【参考方案2】:
我们最终完成了一个实现,将 BigQuery 的回复拼接到一个大的 JSON 数组中:
exports.stream = (query, params, res) =>
// Light testing for descriptive errors in the parameters
for (let key in params)
if (typeof params[key] === "number" && isNaN(params[key]))
throw new TypeError(`The parameter "$key" should be a number`);
return new Promise((resolve, reject) =>
let prev = false;
const onData = row =>
try
// Only handle it when there's a row
if (!row) return;
// There was a previous row written before, so add a comma
if (prev)
res.write(",");
res.write(stringify(row));
prev = true;
catch (error)
console.error("Cannot parse row:", error);
// Just ignore it, don't write this frame
;
const onEnd = () =>
res.write("]");
res.end();
resolve();
;
res.writeHead(200, "Content-Type": "application/json" );
res.write("[");
bigQuery
.createQueryStream( query, params )
.on("error", reject)
.on("data", onData)
.on("end", onEnd);
);
;
它将通过拼接构建一个大型 JSON 数组:
[ // <- First character sent
stringify(row1) // <- First row
, // <- add comma on second row iteration
stringify(row2) // <- Second row
...
stringify(rowN) // <- Last row
] // <- Send the "]" character to close the array
这样有好处:
数据会尽快发送,因此带宽需求较低。 (取决于 BigQuery 实现)服务器端的内存需求较低,因为并非所有数据都同时保存在内存中,只有小块。【讨论】:
以上是关于使用 Express 将 BigQuery 流式传输到前端的主要内容,如果未能解决你的问题,请参考以下文章
Google BigQuery - 将数据流式传输到 BigQuery
使用java将json数据流式传输到Bigquery中。不使用作业加载数据