如何使用createWriteStream将JSON流式传输到BigQuery表?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用createWriteStream将JSON流式传输到BigQuery表?相关的知识,希望对你有一定的参考价值。

我有一个大的JSON文件,我想稍微转换并作为新表发送到Google BigQuery。我在过去使用node.js中的流效果很好,我认为这是解决这个问题的一个不错的解决方案。我正在使用official Google node.js BigQuery API。我能够创建一个没有问题的正确模式的表。我想我准备好了解决方案。该程序完成正常,但没有数据最终登陆我的BigQuery表。

相关代码如下

我的node.js流代码:

fs.createReadStream('testfile.json')
        .pipe(require('split')())
        .pipe(require('event-stream').mapSync((data) => {
            if (data.length > 1) {
                let obj;
                try {
                    obj = JSON.parse('{' + (data[data.length - 1] === ',' ? data.slice(0, data.length - 1) : data) + '}');
                } catch (e) {
                    console.error('error parsing!', e, data);
                }
                let user = Object.keys(obj)[0];
                let company = obj[user][0];
                let item = {
                    user: user,
                    company: company
                };
                console.log(item);
                return JSON.stringify(item);
            }
        }))
        .pipe(table.createWriteStream('json'))
        .on('error', (e) => {
            console.error('Error!', e);
        })
        .on('complete', (job) => {
            console.log('All done!', job);
        });

testfile.json看起来像这样:

{
  "a":["a company", "1234567"],
  "b":["b company", "1234214"],
  "c":["c company", "12332231"]
}

当我运行程序时输出如下:

{ user: 'a', company: 'a company' }
{ user: 'b', company: 'b company' }
{ user: 'c', company: 'c company' }
All done! Job {
  metadata:
   { kind: 'bigquery#job',
   /* lots more data here */

docs for createWriteStream并不是非常详细的数据应该是什么格式进入写入流,所以我觉得我有点盲目飞行。

答案

找出我需要做的事情a)使导入工作和b)更好地了解正在发生的事情。

修复导入

  1. 指定您将向createWriteStream提供换行符分隔的JSON文件: let firehose = table.createWriteStream({ sourceFormat: 'NEWLINE_DELIMITED_JSON' });

  1. 确保JSON转换器返回换行符分隔的JSON: return JSON.stringify(item) + ' ';

对流和工作状态的可见性

firehose writeStreamerrorcomplete事件,你可以订阅,但桌子的writeStreamcomplete事件提供了一个参数作为一个参数,本身有更多的事件,你可以订阅,以获得更多的洞察力。

let moment = require('moment');
firehose.on('error', (e) => {
    console.error('firehose error!', e);
});
firehose.on('complete', (job) => {
    console.log('Firehose into BigQuery emptied! BigQuery Job details:', job.metadata.status.state, job.metadata.jobReference.jobId);
    console.log('Now we wait for the Job to finish...');
    job.on('complete', (job) => {
        console.log('BigQuery Job loaded', job.statistics.load.inputFileBytes, 'bytes yielding', job.statistics.load.outputRows, 'rows and', job.statistics.load.badRecords, 'bad records in', moment(parseInt(job.statistics.endTime)).from(moment(parseInt(job.statistics.startTime)), true));
    });
    job.on('error', (e) => { console.error('Job error', e); });
});

以上是关于如何使用createWriteStream将JSON流式传输到BigQuery表?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用createWriteStream将JSON流式传输到BigQuery表?

使用 Fetch API 和 fs.createWriteStream 对文件进行流式响应

fs.createwritestream 不是函数

fs.createWriteStream 停顿

createWriteStream 的 ('error') 上的开玩笑单元测试

我可以从 fs.createWriteStream() 获取缓冲区吗?