如何使用在 Dataflow 执行期间计算的架构写入 BigQuery?

Posted

技术标签:

【中文标题】如何使用在 Dataflow 执行期间计算的架构写入 BigQuery?【英文标题】:How do I write to BigQuery using a schema computed during Dataflow execution? 【发布时间】:2015-04-03 22:01:41 【问题描述】:

我有以下场景:

    管道 A 在 BigQuery 中查找表 A,进行一些计算并返回列名列表。 此列名称列表用作管道 B 输出的 BigQuery 架构。

您能否告诉我实现这一目标的最佳选择是什么?

管道 A 可以使用 TextIO 将列名列表写入临时或暂存位置文件,然后管道执行程序读取这些文件以定义管道 B 的架构。如果这种方法看起来不错,请告诉我是否有一个 Dataflow 实用程序可以从临时或暂存位置读取文件,或者是否应该使用 GCS API。

【问题讨论】:

【参考方案1】:

您需要执行以下操作:

    构造管道 A 以写入某个位置,例如 GCS(在构造管道 B 时可以引用的任何持久位置都可以)。 使用 BlockingDataflowPipelineRunner 运行并等待管道 A 完成。 通过读取您在步骤 1 中定义的位置,使用架构信息构造管道 B。 运行管道 B。

我不会使用临时位置,因为我们可能会在您开始建造管道 B 之前将其清理干净。可以使用暂存位置(如果与临时位置不同)。我还建议使用唯一的文件名,这样如果管道 A 运行多次,您就不会使用管道 B 读取过时的结果。

这应该可以帮助您读取和写入 GCS: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GcsUtil.java

您可以从 PipelineOptions 对象中获取 GcsUtil 的实例: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/GcsOptions.java#L43

【讨论】:

【参考方案2】:

最新版本的 Apache Beam 可以做到这一点。请在Writing different values to different BigQuery tables in Apache Beam 上查看我的更一般性问题和自我回答。

【讨论】:

以上是关于如何使用在 Dataflow 执行期间计算的架构写入 BigQuery?的主要内容,如果未能解决你的问题,请参考以下文章

从 DataFlow 加载到现有 BigQuery 表时是不是可以更新架构?

意外行为 - TPL DataFlow BatchBlock 在 TriggerBatch 执行时拒绝项目

如何使用 DataFlow 和 Cloud Pub Sub 确保幂等性?

使用 Spring Cloud DataFlow 在无限运行的应用程序中编排长时间运行的外部批处理作业是不是合适?

Flink架构

如何在Apache Beam / Google Dataflow中使用ParseJsons?