Apache Beam 使用多个表时的写入次数

Posted

技术标签:

【中文标题】Apache Beam 使用多个表时的写入次数【英文标题】:Apache Beam how many writes when using multiple tables 【发布时间】:2019-12-21 13:26:42 【问题描述】:

我正在使用 Apache Beam 从 PubSub 读取消息并将它们写入 BigQuery。我要做的是根据输入中的信息写入多个表。为了减少写入量,我对来自 PubSub 的输入使用窗口化。

一个小例子:

messages
    .apply(new PubsubMessageToTableRow(options))
    .get(TRANSFORM_OUT)
    .apply(ParDo.of(new CreateKVFromRow())
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))))
    // group by key
    .apply(GroupByKey.create())
    // Are these two rows what I want?
    .apply(Values.create())
    .apply(Flatten.iterables())
    .apply(BigQueryIO.writeTableRows()
          .withoutValidation()
          .withCreateDisposition(CreateDisposition.CREATE_NEVER)
          .withWriteDisposition(WriteDisposition.WRITE_APPEND)
          .withExtendedErrorInfo()
          .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
          .to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) input -> 
                                        // Simplified for readability
                                        Integer destination = (Integer) input.getValue().get("key");
                                        return new TableDestination(
                                                new TableReference()
                                                        .setProjectId(options.getProjectID())
                                                        .setDatasetId(options.getDatasetID())
                                                        .setTableId(destination + "_Table"),
                                                "Table Destination");
                                    ));

我在文档中找不到任何内容,但我想知道每个窗口执行了多少次写入?如果这些是多个表,是否为窗口中所有元素的每个表写入一个?还是每个元素一次,因为每个表可能因每个元素而异?

【问题讨论】:

【参考方案1】:

由于您使用 PubSub 作为来源,您的作业似乎是流式作业。因此,默认的插入方法是STREAMING_INSERTS(参见docs)。我看不到使用这种方法减少写入的任何好处或理由,因为 billig 是基于数据的大小。顺便说一句,您的示例或多或少并没有真正有效地减少写入。

虽然它是一个流式作业,但由于一些版本也支持FILE_LOADS 方法。如果withMethod 设置为FILE_LOADS,您可以在BigQueryIO 上定义withTriggeringFrequency。此设置定义加载作业发生的频率。在这里,连接器为您处理所有事情,您无需按键或窗口数据进行分组。将为每个表启动一个加载作业。

由于您的数据在 BigQuery 中需要一些时间似乎完全没问题,因此我建议您使用 FILE_LOADS,因为与流式插入相比,加载是免费的。定义触发频率时请注意quotas。

【讨论】:

以上是关于Apache Beam 使用多个表时的写入次数的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Apache Beam 中写入多个文件?

Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery

如何使用 Apache Beam Python 将输出写入动态路径

将 Spark DataFrame 写入 Hive 表时的内存分配问题

使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表

使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS