Apache Beam 使用多个表时的写入次数
Posted
技术标签:
【中文标题】Apache Beam 使用多个表时的写入次数【英文标题】:Apache Beam how many writes when using multiple tables 【发布时间】:2019-12-21 13:26:42 【问题描述】:我正在使用 Apache Beam
从 PubSub 读取消息并将它们写入 BigQuery。我要做的是根据输入中的信息写入多个表。为了减少写入量,我对来自 PubSub 的输入使用窗口化。
一个小例子:
messages
.apply(new PubsubMessageToTableRow(options))
.get(TRANSFORM_OUT)
.apply(ParDo.of(new CreateKVFromRow())
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))))
// group by key
.apply(GroupByKey.create())
// Are these two rows what I want?
.apply(Values.create())
.apply(Flatten.iterables())
.apply(BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) input ->
// Simplified for readability
Integer destination = (Integer) input.getValue().get("key");
return new TableDestination(
new TableReference()
.setProjectId(options.getProjectID())
.setDatasetId(options.getDatasetID())
.setTableId(destination + "_Table"),
"Table Destination");
));
我在文档中找不到任何内容,但我想知道每个窗口执行了多少次写入?如果这些是多个表,是否为窗口中所有元素的每个表写入一个?还是每个元素一次,因为每个表可能因每个元素而异?
【问题讨论】:
【参考方案1】:由于您使用 PubSub 作为来源,您的作业似乎是流式作业。因此,默认的插入方法是STREAMING_INSERTS
(参见docs)。我看不到使用这种方法减少写入的任何好处或理由,因为 billig 是基于数据的大小。顺便说一句,您的示例或多或少并没有真正有效地减少写入。
虽然它是一个流式作业,但由于一些版本也支持FILE_LOADS
方法。如果withMethod
设置为FILE_LOADS
,您可以在BigQueryIO
上定义withTriggeringFrequency
。此设置定义加载作业发生的频率。在这里,连接器为您处理所有事情,您无需按键或窗口数据进行分组。将为每个表启动一个加载作业。
由于您的数据在 BigQuery 中需要一些时间似乎完全没问题,因此我建议您使用 FILE_LOADS
,因为与流式插入相比,加载是免费的。定义触发频率时请注意quotas。
【讨论】:
以上是关于Apache Beam 使用多个表时的写入次数的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery
如何使用 Apache Beam Python 将输出写入动态路径
将 Spark DataFrame 写入 Hive 表时的内存分配问题
使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表