数据流 GCS 到 BQ 问题

Posted

技术标签:

【中文标题】数据流 GCS 到 BQ 问题【英文标题】:Dataflow GCS to BQ Problems 【发布时间】:2017-12-14 20:59:11 【问题描述】:

情况是这样的:我在 GCS 中有一组文件,这些文件是经过压缩的,并且我尝试导入的文件扩展名为 .gz(即 000000_[0-5].gz)一个 BQ 表。迄今为止,我一直在从命令行执行命令,但希望通过 Dataflow 来完成此操作,未来可能会添加一些转换。

压缩后的 GCS 文件中的数据是一个复杂的 JSON 结构,经常更改架构,因此最简单的方法是将整个文件作为一个 TSV 带入 BigQuery,只有一列,称为 record,然后使用 JSON_EXTRACT 函数BQ 在需要的时候解析出需要的值。

问题:我已经编写了一个 Dataflow 管道,它在这种情况下会做最少的事情;从 GCS 读取并写入 BigQuery 表。但是,当我执行此管道时,我收到 JSON 解析错误,如下所示:

Error while reading data, error message: JSON table encountered too 
many errors, giving up. Rows: 1; errors: 1., error: Error while reading 
data, error message: JSON table encountered too many errors, giving up. 
Rows: 1; errors: 1., error: Error while reading data, error message: 
JSON parsing error in row starting at position 2630029539: Value 
encountered without start of object.

下面是我的数据流脚本,其中一些变量是匿名的。

from __future__ import absolute_import

import argparse
import logging
import re
import json

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):

  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      default='gs://BUCKET_NAME/input-data/000000_0.gz',
                      help='Input file to process.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DataflowRunner',
      '--project=GCP_PROJECT_NAME',
      '--staging_location=gs://BUCKET_NAME/dataflow-staging',
      '--temp_location=gs://BUCKET_NAME/dataflow-temp',
      '--job_name=gcs-gzcomp-to-bq1',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:

    (p | "ReadFromGCS" >> ReadFromText(known_args.input)
       | WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
           project='GCP_PROJECT_NAME', schema='record:string'))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

如您所见,我尝试通过指定仅包含一个字符串类型列的架构来执行与传统加载作业相同的操作,但仍然失败。

有没有办法明确告诉 Dataflow 关于如何导入 GCS 文件的更多详细信息?即指定 TSV,即使它是每一行上的有效 JSON 对象?

此外,如果此错误与我可能搞砸的其他任何事情有关,请同时指出;我是 Dataflow 的超级新手,但对 BQ 和其他一些 GCP 工具非常有经验,因此希望将其添加到我的工具带中。

【问题讨论】:

【参考方案1】:

我认为WriteToBigQuery 的输入集合应该是字典的集合(每个键映射到 BigQuery 列),而不是字符串的集合。尝试通过| beam.Map(lambda line: dict(record=line)) 之类的内容。

【讨论】:

您能否发布此代码,以便我们查看它的工作原理以及您拥有的任何云功能?

以上是关于数据流 GCS 到 BQ 问题的主要内容,如果未能解决你的问题,请参考以下文章

如何将 BigQuery 数据导出到 GCS?

使用 bq 命令行将文件从 GBQ 提取到没有 csv 标头的 GCS

Bq 命令或数据流按原样加载

Airflow DAG - 如何首先检查BQ(必要时删除)然后运行数据流作业?

如何在 GCS 中的增量表之上创建 BQ 外部表并仅显示最新快照

GSUTIL CP 文件延迟