从数据流管道写入 BQ 时的动态表名
Posted
技术标签:
【中文标题】从数据流管道写入 BQ 时的动态表名【英文标题】:Dynamic table name when writing to BQ from dataflow pipelines 【发布时间】:2016-03-14 04:13:44 【问题描述】:作为以下问答的后续问题:
https://***.com/questions/31156774/about-key-grouping-with-groupbykey
我想与谷歌数据流工程团队 (@jkff) 确认 Eugene 提出的第三个选项是否完全适用于谷歌数据流:
“有一个 ParDo 接受这些键并创建 BigQuery 表,另一个 ParDo 接受数据并将数据流写入表”
我的理解是 ParDo/DoFn 会处理每个元素,当从 ParDo/DoFn 的 processElement 写出时,我们如何指定表名(从侧面输入传入的键的功能)?
谢谢。
已更新,带有 DoFn,由于 c.element().value 不是 pcollection,因此显然无法正常工作。
PCollection<KV<String, Iterable<String>>> output = ...;
public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer>
private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs)
this.keysAsSideinputs = keysAsSideinputs;
@Override
public void processElement(ProcessContext c)
List<String> keys = c.sideInput(keysAsSideinputs);
String key = c.element().getKey();
//the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
c.element().getValue().apply(Pardo.of(new FormatLineFn()))
.apply(TextIO.Write.to(key));
c.output(1);
【问题讨论】:
现在可以在开箱即用的最新 Beam ***.com/questions/43505534/… 中使用 【参考方案1】:BigQueryIO.Write 转换不支持这一点。您可以做的最接近的事情是使用每个窗口的表格,并使用自定义 WindowFn 对您在窗口对象中选择表格所需的任何信息进行编码。
如果您不想这样做,可以直接从 DoFn 进行 BigQuery API 调用。有了这个,您可以将表名设置为您想要的任何内容,如您的代码计算的那样。这可以从侧面输入中查找,或者直接从 DoFn 当前正在处理的元素中计算出来。为避免对 BigQuery 进行过多的小调用,您可以使用 finishBundle() 批量处理请求;
您可以在此处查看 Dataflow 运行程序如何进行流式导入: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
【讨论】:
感谢您的回复。但是,我仍然无法掌握 DoFn 在 processElement 中调用 BigQueryIO.Write 的语法。我在两件事上需要帮助: 1. 你能给我看一个关于上述用法的快速 DoFn 示例吗? 2. 在 processElement 中调用 BigQueryIO.Write 是否会导致额外的性能问题,因为这将在每个元素处理中调用?谢谢。 还在OP中添加了DoFn,请分享如何根据PCollection以上是关于从数据流管道写入 BQ 时的动态表名的主要内容,如果未能解决你的问题,请参考以下文章