使用 Dataflow 在 BigQuery 表之间进行流式更新

Posted

技术标签:

【中文标题】使用 Dataflow 在 BigQuery 表之间进行流式更新【英文标题】:Streaming updates between BigQuery tables with Dataflow 【发布时间】:2019-03-06 07:43:41 【问题描述】:

尝试在 Cloud Dataflow Job 中启用流式传输,这需要从一个 BigQuery 表中读取数据并以附加模式将其写入另一个 BigQuery 表。

为此,我在 Java 代码中启用了options.setStreaming(true);

应用窗口概念 - 固定窗口选项(如下代码),

PCollection<TableRow> fixedWindowedItems = finalRecords.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));

最后使用 BigQueryIO(如下代码)将数据写入 BigQuery 表,

fixedWindowedItems.apply(BigQueryIO.writeTableRows()
                .withSchema(schema1)
                .to(options.getTargetTable())
                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

代码运行良好。没有错误。第一次将数据从一个表移动到另一个表。但是,如果您在第一个表中插入新数据,则第二个表不会得到反映。尽管 Job 类型为 Streaming,但 Job 似乎以 Succeeded 状态完成。

如果我在代码/配置级别错过了启用流媒体模式的内容,能否告诉我。

【问题讨论】:

您能解释一下保持两个 BigQuery 表同步的动机吗? - 不支持从 BigQuery 读取作为流式源。它仅用作批处理源。发生的事情是您正在从一个表读取一批数据到另一个表。 - 所以我倾向于问 agian:您为什么对不断地将数据从一个 BQ 表移动到另一个表感兴趣? 【参考方案1】:

初步答案:

您正在寻找的功能是 BigQuery 输出更改流,并且该流应用于另一个 BigQuery 表,对吗?这不是 Apache Beam / Dataflow BigQuery 源提供的。

您的管道运行并完成,因为它将 BigQuery 表中的一批数据复制/查询到另一个表中。

您为什么要让两个 BQ 表保持同步?如果你解释你的场景,我们可以一起改进它。

【讨论】:

感谢您的回复。我的印象是,任何源都可以用作 Dataflow 的输入/输出,同时具有批处理/流式处理方法。现在我可以理解,对于 Dataflow 流,我们需要像 Pub-sub、Kafka 这样的流平台提供输入。我们可以使用 Cloud SQL 或平面文件的数据流流式传输输入吗?

以上是关于使用 Dataflow 在 BigQuery 表之间进行流式更新的主要内容,如果未能解决你的问题,请参考以下文章

如何使用在 Dataflow 执行期间计算的架构写入 BigQuery?

在 Dataflow 中从 BigQuery 写入云存储时如何设置文件大小而不是分片数

在 Dataflow 中使用啥转换来合并具有不同列的 csv 文件,同时将它们加载到 BigQuery?

如何使用 python 将字典写入 Dataflow 中的 Bigquery

使用 Dataflow 管道 (python) 将多个 Json zip 文件从 GCS 加载到 BigQuery

使用 DataFlow Engine 运行 bigquery 查询时,如何使用 UDF(和其他函数)?