从数据流管道写入 BQ 时的动态表名

Posted

技术标签:

【中文标题】从数据流管道写入 BQ 时的动态表名【英文标题】:Dynamic table name when writing to BQ from dataflow pipelines 【发布时间】:2016-03-14 04:13:44 【问题描述】:

作为以下问答的后续问题:

https://***.com/questions/31156774/about-key-grouping-with-groupbykey

我想与谷歌数据流工程团队 (@jkff) 确认 Eugene 提出的第三个选项是否完全适用于谷歌数据流:

“有一个 ParDo 接受这些键并创建 BigQuery 表,另一个 ParDo 接受数据并将数据流写入表”

我的理解是 ParDo/DoFn 会处理每个元素,当从 ParDo/DoFn 的 processElement 写出时,我们如何指定表名(从侧面输入传入的键的功能)?

谢谢。

已更新,带有 DoFn,由于 c.element().value 不是 pcollection,因此显然无法正常工作。

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> 

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) 
        this.keysAsSideinputs = keysAsSideinputs;
    

@Override
    public void processElement(ProcessContext c) 
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
        
    

【问题讨论】:

现在可以在开箱即用的最新 Beam ***.com/questions/43505534/… 中使用 【参考方案1】:

BigQueryIO.Write 转换不支持这一点。您可以做的最接近的事情是使用每个窗口的表格,并使用自定义 WindowFn 对您在窗口对象中选择表格所需的任何信息进行编码。

如果您不想这样做,可以直接从 DoFn 进行 BigQuery API 调用。有了这个,您可以将表名设置为您想要的任何内容,如您的代码计算的那样。这可以从侧面输入中查找,或者直接从 DoFn 当前正在处理的元素中计算出来。为避免对 BigQuery 进行过多的小调用,您可以使用 finishBundle() 批量处理请求;

您可以在此处查看 Dataflow 运行程序如何进行流式导入: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java

【讨论】:

感谢您的回复。但是,我仍然无法掌握 DoFn 在 processElement 中调用 BigQueryIO.Write 的语法。我在两件事上需要帮助: 1. 你能给我看一个关于上述用法的快速 DoFn 示例吗? 2. 在 processElement 中调用 BigQueryIO.Write 是否会导致额外的性能问题,因为这将在每个元素处理中调用?谢谢。 还在OP中添加了DoFn,请分享如何根据PCollection>中的键写出值。谢谢。 感谢丹尼尔更新答案。真的希望可以向 Text 和 BQ IO writer 添加一个新功能,以允许动态命名的文件或表。并不是说这将是一项简单的任务,而是真正有用的功能。我已经接受了答案。

以上是关于从数据流管道写入 BQ 时的动态表名的主要内容,如果未能解决你的问题,请参考以下文章

如何在谷歌云数据流管道中传递动态参数

在数据流中完成 BQ 写入后的 Apache Beam 写入状态信息

数据流 bigquery 单元测试

从管道读取()保证在EOF之前提供所有原子写入的数据?

将批量数据写入 bigQuery

BQ shell 使用 write_disposition 作为写入附加加载数据存储时出错