如何在谷歌云数据流/Apache Beam 中并行运行多个 WriteToBigQuery?

Posted

技术标签:

【中文标题】如何在谷歌云数据流/Apache Beam 中并行运行多个 WriteToBigQuery?【英文标题】:How to run multiple WriteToBigQuery parallel in google cloud dataflow / apache beam? 【发布时间】:2018-09-06 15:08:49 【问题描述】:

给定数据,我想从一堆多个事件中分离事件

"type": "A", "k1": "v1"
"type": "B", "k2": "v2"
"type": "C", "k3": "v3"

我想在 bigquery 中将 type: A 事件分离到表 A,将 type:B 事件分离到表 B,将 type: C 事件分离到表 C

这是我通过apache beam python sdk 实现的代码并将数据写入bigquery

A_schema = 'type:string, k1:string'
B_schema = 'type:string, k2:string'
C_schema = 'type:string, k2:string'

class ParseJsonDoFn(beam.DoFn):
    A_TYPE = 'tag_A'
    B_TYPE = 'tag_B'
    C_TYPE = 'tag_C'
    def process(self, element):
        text_line = element.trip()
        data = json.loads(text_line)

        if data['type'] == 'A':
            yield pvalue.TaggedOutput(self.A_TYPE, data)
        elif data['type'] == 'B':
            yield pvalue.TaggedOutput(self.B_TYPE, data)
        elif data['type'] == 'C':
            yield pvalue.TaggedOutput(self.C_TYPE, data)

def run():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                      dest='input',
                      default='data/path/data',
                      help='Input file to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
      '--runner=DirectRunner',
      '--project=project-id',
      '--job_name=seperate-bi-events-job',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromText(known_args.input)

    multiple_lines = (
        lines
        | 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
                                      ParseJsonDoFn.A_TYPE,
                                      ParseJsonDoFn.B_TYPE,
                                      ParseJsonDoFn.C_TYPE)))

    a_line = multiple_lines.tag_A
    b_line = multiple_lines.tag_B
    c_line = multiple_lines.tag_C

    (a_line
        | "output_a" >> beam.io.WriteToBigQuery(
                                          'temp.a',
                                          schema = A_schema,
                                          write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                        ))

    (b_line
        | "output_b" >> beam.io.WriteToBigQuery(
                                          'temp.b',
                                          schema = B_schema,
                                          write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                        ))

    (c_line
        | "output_c" >> beam.io.WriteToBigQuery(
                                          'temp.c',
                                          schema = (C_schema),
                                          write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                        ))

    p.run().wait_until_finish()

输出:

INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.

但是这里有两个问题

bigquery中没有数据? 从日志看来,代码不是并行运行,而是按顺序运行 3 次?

我的代码有问题还是我遗漏了什么?

【问题讨论】:

【参考方案1】:

bigquery 中没有数据?

在将数据写入 BigQuery 时,您的代码似乎非常好(C_schema 应该是 k3 而不是 k2)。请记住,您正在流式传输数据,因此如果单击Preview 表格按钮,直到提交流式缓冲区中的数据,您将看不到它。运行SELECT * 查询将显示预期结果:

从日志看来,代码不是并行运行,而是按顺序运行 3 次?

好的,这很有趣。通过跟踪code 中的WARNING 消息,我们可以看到以下内容:

# if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
# the table before this point.
if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
  # BigQuery can route data to the old table for 2 mins max so wait
  # that much time before creating the table and writing it
  logging.warning('Sleeping for 150 seconds before the write as ' +
                  'BigQuery inserts can be routed to deleted table ' +
                  'for 2 mins after the delete and create.')
  # TODO(BEAM-2673): Remove this sleep by migrating to load api
  time.sleep(150)
  return created_table
else:
  return created_table

在阅读 BEAM-2673 和 BEAM-2801 之后,似乎这是由于 BigQuery 接收器使用带有 DirectRunner 的 Streaming API 的问题。这将导致进程在重新创建表时休眠 150 秒,这不是并行完成的。

相反,如果我们在 Dataflow 上运行它(使用 DataflowRunner,提供暂存和临时存储桶路径以及从 GCS 加载输入数据),这将并行运行三个导入作业。请看,在下图中,这三个都从 22:19:45 开始并在 22:19:56 结束:

【讨论】:

感谢您的回答,但是,在工作完成一段时间后,我无法从 bigquery temp.a 中找到任何数据。 我使用了select * from temp.a limit 100;,此表中没有数据。但是,脚本日志显示INFO:root:finish &lt;DoOperation output_a/WriteToBigQuery output_tags=['out'], receivers=[ConsumerSet[output_a/WriteToBigQuery.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]&gt; DEBUG:root:Successfully wrote 2 rows. 为了让我的问题更清楚,另一个问题***.com/questions/52254637/…在这里

以上是关于如何在谷歌云数据流/Apache Beam 中并行运行多个 WriteToBigQuery?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用多个工作人员加快批量导入谷歌云数据存储的速度?

使用 Apache Beam 从数据库中读取批量数据

Apache Beam 数据流 BigQuery

如何在谷歌云数据流管道中传递动态参数

尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错

Bigquery 如何使用存储在谷歌云中的数据?