使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS

Posted

技术标签:

【中文标题】使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS【英文标题】:Write BigQuery results to GCS in CSV format using Apache Beam 【发布时间】:2018-10-22 12:27:40 【问题描述】:

我是 Apache Beam 的新手,我正在尝试编写一个管道以从 Google BigQuery 中提取数据并使用 Python 以 CSV 格式将数据写入 GCS。

使用 beam.io.read(beam.io.BigQuerySource()) 我可以从 BigQuery 读取数据,但不知道如何以 CSV 格式将其写入 GCS。

是否有自定义功能可以实现相同的功能,您能帮帮我吗?

import logging

import apache_beam as beam
from apache_beam.io.BigQueryDisposition import CREATE_IF_NEEDED
from apache_beam.io.BigQueryDisposition import WRITE_TRUNCATE

PROJECT='project_id'
BUCKET='project_bucket'


def run():
    argv = [
        '--project=0'.format(PROJECT),
        '--job_name=readwritebq',
        '--save_main_session',
        '--staging_location=gs://0/staging/'.format(BUCKET),
        '--temp_location=gs://0/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
    ]

    with beam.Pipeline(argv=argv) as p:

        # Execute the SQL in big query and store the result data set into given Destination big query table.
        BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
            beam.io.BigQuerySource(query =  'Select * from `dataset.table`', use_standard_sql=True))
        # Extract data from Bigquery to GCS in CSV format.
        # This is where I need your help

        BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
                table='tablename',
                dataset='datasetname',
                project='project_id',
                schema='name:string,gender:string,count:integer',
                create_disposition=CREATE_IF_NEEDED,
                write_disposition=WRITE_TRUNCATE)

if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   run()

【问题讨论】:

欢迎来到Stack Overflow!请拨打tour 并访问help center 以充分利用本网站。还请分享您迄今为止开发的relevant parts of the code。这有助于找出问题所在。 【参考方案1】:

您可以使用WriteToText 添加.csv 后缀和headers。考虑到您需要将查询结果解析为 CSV 格式。例如,我使用了莎士比亚公共dataset 和以下查询:

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10

我们现在读取查询结果:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA 现在包含键值对:

u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407
u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319
u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313

我们可以应用beam.Map 函数来只产生值:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

摘自BQ_VALUES

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]

最后再次映射以使所有列值用逗号而不是列表分隔(考虑到如果双引号可以出现在字段中,则需要转义它们):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

现在我们用后缀和标题将结果写入 GCS:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
    'gs://0/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

书面结果:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"

【讨论】:

在同一个例子中,如果我必须使用子进程(GSUTIL 命令)将文件从 GCS 写入本地目录,我将如何在管道中实现。 应该注意这个解决方案依赖于键排序,因为x.values() 使用的是隐式排序。对于 python 3.7 及更高版本(***.com/questions/1867861/…),这是一个合理的假设,但不确定BigQuerySource 是否也能保证早期版本也是如此。【参考方案2】:

对于希望使用 Python 3 进行更新的任何人,请替换以下行

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: list(x.values()))

【讨论】:

以上是关于使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam 批量到 BigQuery,中间文件,它们是不是仅以 JSON 格式生成

有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?

我们如何使用 python sdk 在 Apache Beam 中读取带有附件的 CSV 文件?

Python Apache Beam 侧输入断言错误

输出类型中 beam.ParDo 和 beam.Map 的区别?

如何将 DoFn PTransform 应用于 Apache Beam 中的 PCollectionTuple