使用 Dataflow 在 BigQuery 表之间进行流式更新
Posted
技术标签:
【中文标题】使用 Dataflow 在 BigQuery 表之间进行流式更新【英文标题】:Streaming updates between BigQuery tables with Dataflow 【发布时间】:2019-03-06 07:43:41 【问题描述】:尝试在 Cloud Dataflow Job 中启用流式传输,这需要从一个 BigQuery 表中读取数据并以附加模式将其写入另一个 BigQuery 表。
为此,我在 Java 代码中启用了options.setStreaming(true);
。
应用窗口概念 - 固定窗口选项(如下代码),
PCollection<TableRow> fixedWindowedItems = finalRecords.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));
最后使用 BigQueryIO(如下代码)将数据写入 BigQuery 表,
fixedWindowedItems.apply(BigQueryIO.writeTableRows()
.withSchema(schema1)
.to(options.getTargetTable())
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
代码运行良好。没有错误。第一次将数据从一个表移动到另一个表。但是,如果您在第一个表中插入新数据,则第二个表不会得到反映。尽管 Job 类型为 Streaming,但 Job 似乎以 Succeeded 状态完成。
如果我在代码/配置级别错过了启用流媒体模式的内容,能否告诉我。
【问题讨论】:
您能解释一下保持两个 BigQuery 表同步的动机吗? - 不支持从 BigQuery 读取作为流式源。它仅用作批处理源。发生的事情是您正在从一个表读取一批数据到另一个表。 - 所以我倾向于问 agian:您为什么对不断地将数据从一个 BQ 表移动到另一个表感兴趣? 【参考方案1】:初步答案:
您正在寻找的功能是 BigQuery 输出更改流,并且该流应用于另一个 BigQuery 表,对吗?这不是 Apache Beam / Dataflow BigQuery 源提供的。
您的管道运行并完成,因为它将 BigQuery 表中的一批数据复制/查询到另一个表中。
您为什么要让两个 BQ 表保持同步?如果你解释你的场景,我们可以一起改进它。
【讨论】:
感谢您的回复。我的印象是,任何源都可以用作 Dataflow 的输入/输出,同时具有批处理/流式处理方法。现在我可以理解,对于 Dataflow 流,我们需要像 Pub-sub、Kafka 这样的流平台提供输入。我们可以使用 Cloud SQL 或平面文件的数据流流式传输输入吗?以上是关于使用 Dataflow 在 BigQuery 表之间进行流式更新的主要内容,如果未能解决你的问题,请参考以下文章
如何使用在 Dataflow 执行期间计算的架构写入 BigQuery?
在 Dataflow 中从 BigQuery 写入云存储时如何设置文件大小而不是分片数
在 Dataflow 中使用啥转换来合并具有不同列的 csv 文件,同时将它们加载到 BigQuery?
如何使用 python 将字典写入 Dataflow 中的 Bigquery