如何使用 createWriteStream 将 JSON 流式传输到 BigQuery 表?

Posted

技术标签:

【中文标题】如何使用 createWriteStream 将 JSON 流式传输到 BigQuery 表?【英文标题】:How to stream JSON to BigQuery table using createWriteStream? 【发布时间】:2018-03-04 16:49:58 【问题描述】:

我有一个大型 JSON 文件,我想对其进行轻微转换并作为新表发送到 Google BigQuery。我过去在 node.js 中使用过流,效果很好,我认为这是解决这个问题的一个不错的解决方案。我正在使用official Google node.js BigQuery API。我能够毫无问题地创建具有正确架构的表。我认为我已经准备好了解决方案。该程序完成得很好,但没有数据最终落入我的 BigQuery 表中。

相关代码如下

我的 node.js 流代码:

fs.createReadStream('testfile.json')
        .pipe(require('split')())
        .pipe(require('event-stream').mapSync((data) => 
            if (data.length > 1) 
                let obj;
                try 
                    obj = JSON.parse('' + (data[data.length - 1] === ',' ? data.slice(0, data.length - 1) : data) + '');
                 catch (e) 
                    console.error('error parsing!', e, data);
                
                let user = Object.keys(obj)[0];
                let company = obj[user][0];
                let item = 
                    user: user,
                    company: company
                ;
                console.log(item);
                return JSON.stringify(item);
            
        ))
        .pipe(table.createWriteStream('json'))
        .on('error', (e) => 
            console.error('Error!', e);
        )
        .on('complete', (job) => 
            console.log('All done!', job);
        );

testfile.json 看起来像这样:


  "a":["a company", "1234567"],
  "b":["b company", "1234214"],
  "c":["c company", "12332231"]

当我运行程序时,输出如下所示:

 user: 'a', company: 'a company' 
 user: 'b', company: 'b company' 
 user: 'c', company: 'c company' 
All done! Job 
  metadata:
    kind: 'bigquery#job',
   /* lots more data here */

docs for createWriteStream 并没有非常详细地说明数据应该采用什么格式才能泵入写入流,所以我觉得我有点盲目。

【问题讨论】:

【参考方案1】:

找出我需要做的 a) 使导入工作和 b) 更清楚地了解正在发生的事情。

修复导入

    指定您将向createWriteStream 提供一个以换行符分隔的 JSON 文件:

    let firehose = table.createWriteStream(
        sourceFormat: 'NEWLINE_DELIMITED_JSON'
    );
    

    确保 JSON 转换器返回以换行符分隔的 JSON:

    return JSON.stringify(item) + '\n';
    

流和作业状态的可见性

firehose writeStream 具有您可以订阅的 errorcomplete 事件,但表的 writeStreamcomplete 事件提供了一个 Job 作为参数,它本身有更多您可以订阅的事件以获得更多洞察力。

let moment = require('moment');
firehose.on('error', (e) => 
    console.error('firehose error!', e);
);
firehose.on('complete', (job) => 
    console.log('Firehose into BigQuery emptied! BigQuery Job details:', job.metadata.status.state, job.metadata.jobReference.jobId);
    console.log('Now we wait for the Job to finish...');
    job.on('complete', (job) => 
        console.log('BigQuery Job loaded', job.statistics.load.inputFileBytes, 'bytes yielding', job.statistics.load.outputRows, 'rows and', job.statistics.load.badRecords, 'bad records in', moment(parseInt(job.statistics.endTime)).from(moment(parseInt(job.statistics.startTime)), true));
    );
    job.on('error', (e) =>  console.error('Job error', e); );
);

【讨论】:

谢谢你,伙计。 return JSON.stringify(item) + '\n'; 真的为我节省了几个小时,非常感谢。但有一件事,你是怎么想出来的? 对于正在查看事件的新实现的人来说,writeStream 现在有事件job,然后可以通过complete 事件(googleapis.dev/nodejs/bigquery/latest/…) 进行监听

以上是关于如何使用 createWriteStream 将 JSON 流式传输到 BigQuery 表?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用createWriteStream将JSON流式传输到BigQuery表?

使用 Fetch API 和 fs.createWriteStream 对文件进行流式响应

fs.createwritestream 不是函数

fs.createWriteStream 停顿

createWriteStream 的 ('error') 上的开玩笑单元测试

我可以从 fs.createWriteStream() 获取缓冲区吗?