使用 Apache Beam 向 BigQuery 传播插入时如何指定 insertId

Posted

技术标签:

【中文标题】使用 Apache Beam 向 BigQuery 传播插入时如何指定 insertId【英文标题】:How to specify insertId when spreaming insert to BigQuery using Apache Beam 【发布时间】:2019-06-04 06:07:27 【问题描述】:

BigQuery 支持流式插入的重复数据删除。如何使用 Apache Beam 使用此功能?

https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency

为帮助确保数据一致性,您可以为每个插入的行提供 insertId。 BigQuery 会记住此 ID 至少一分钟。如果您尝试在该时间段内流式传输同一组行并设置了 insertId 属性,BigQuery 会使用 insertId 属性尽最大努力对您的数据进行重复数据删除。您可能必须重试插入,因为在某些错误条件下无法确定流式插入的状态,例如系统和 BigQuery 之间的网络错误或 BigQuery 中的内部错误。如果您重试插入,请对同一组行使用相同的 insertId,以便 BigQuery 可以尝试对您的数据进行重复数据删除。有关详细信息,请参阅流式插入疑难解答。

我在 Java 文档中找不到这样的功能。 https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html

在this question,他建议在TableRow 中设置insertId。这是正确的吗?

https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableRow.html?is-external=true

BigQuery 客户端库具有此功能。

https://googleapis.github.io/google-cloud-java/google-cloud-clients/apidocs/index.html?com/google/cloud/bigquery/package-summary.html https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-clients/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/InsertAllRequest.java#L134

【问题讨论】:

您能否详细说明您的用例?与 BigQuery 结合使用时,Dataflow/Beam 应该只执行一次,而无需手动指定 insertId。 上面提到了我的用例。想要在插入 BigQuery 时进行重复数据删除。所以只需将 insertId 指定为新行中的列? 我了解您想要删除重复数据。但根据重复的来源,这可能已经是一个已解决的问题。 数据源端没有重复。由于Kafka默认支持至少一次交付,所以我认为Kafka生产者和消费者之间存在重复的可能性。而且我猜数据流可能会在重试某些错误(例如时间网络问题)时多次插入同一行。所以我只想知道如何避免两者的重复。这个问题是关于从数据流到 bigquery 的流插入。 在我的实际用例中,重复数据删除的要求不是那么强。所以我认为最简单的方法就是插入 Big Query,然后对查询进行重复数据删除。但我只想知道 BigQueryIO (Apache Beam) 支持重复数据删除功能。 【参考方案1】:

Pub/Sub + Beam/Dataflow + BigQuery:应该保证“Exactly once”,您不必为此担心太多。现在,当您要求 Dataflow 使用 FILE_LOADS 而不是 STREAMING_INSERTS 插入 BigQuery 时,这种保证会更强。

Kafka + Beam/Dataflow + BigQuery:如果一条消息可以从 Kafka 发出多次(例如,如果生产者重试插入),那么您需要注意 de -复制。在 BigQuery 中(根据您的评论目前已实施),或在带有 .apply(Distinct.create()) 转换的 Dataflow 中。

【讨论】:

谢谢!但我最初的问题是如何使用 Apache Beam 的 BigQuery 重复数据删除功能。 您不能手动操作,因为 Dataflow 已经在为自己使用 insertId 来实现所描述的“恰好一次”。 好的,我明白了。谢谢你的澄清。 感谢您的提问!我不得不请教一些专家才能得到这个答案:)。包括 Pablo,他改进了我上面的答案 我在 Apache Beam 文档中找不到关于 .apply(Distinct.create()) 转换的信息。因此,如果您能在文档中提及它,将会很有帮助。【参考方案2】:

正如 Felipe 在评论中提到的那样,Dataflow 似乎已经在使用 insertId 来实现“恰好一次”。所以我们不能手动指定insertId。

【讨论】:

以上是关于使用 Apache Beam 向 BigQuery 传播插入时如何指定 insertId的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Apache BEAM 在 BigQuery 中执行快速联接

使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS

Apache Beam 数据流 BigQuery

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

结合 BigQuery 和 Pub/Sub Apache Beam

使用 apache beam Json Time Partitioning 在 bigquery 中创建时间分区表