增量加载和 BigQuery

Posted

技术标签:

【中文标题】增量加载和 BigQuery【英文标题】:Incremental loading and BigQuery 【发布时间】:2021-10-25 13:57:38 【问题描述】:

我正在编写增量加载管道以将数据从 mysql 加载到 BigQuery,并使用 Google Cloud Datastore 作为元数据存储库。

我当前的管道是这样写的:

PCollection<TableRow> tbRows = 
pipeline.apply("Read from MySQL",
        JdbcIO.<TableRow>read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                .create("com.mysql.cj.jdbc.Driver", connectionConfig)
                .withUsername(username)
                .withPassword(password)
                .withQuery(query).withCoder(TableRowJsonCoder.of())
                .withRowMapper(JdbcConverters.getResultSetToTableRow())))
    .setCoder(NullableCoder.of(TableRowJsonCoder.of()));

tbRows.apply("Write to BigQuery",
            BigQueryIO.writeTableRows().withoutValidation()
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).to(outputTable));

tbRows.apply("Getting timestamp column",
                MapElements.into(TypeDescriptors.strings())
                        .via((final TableRow row) -> (String) row.get(fieldName)))
                .setCoder(NullableCoder.of(StringUtf8Coder.of())).apply("Max", Max.globally())
                .apply("Updating Datastore", ParDo.of(new DoFn<String, String>() 
                    @ProcessElement
                    public void processElement(final ProcessContext c) 
                        DatastoreConnector.udpate(table, c.element());
                    
                ));

我面临的问题是,当 BigQuery 写入步骤失败时,数据存储区仍在更新,有没有办法在更新数据存储区之前等待 BigQuery 写入完成?

谢谢!

【问题讨论】:

【参考方案1】:

目前这不能在与BigQueryIO.writeTableRows() 相同的管道中完成,因为它会产生终端输出 (PDone)。不过我有一些建议。

我怀疑 BigQuery 写入失败是一种罕见的情况。在这种情况下,您是否可以从辅助作业/流程中删除相应的 Datastore 数据。 您是否考虑过更适合写入增量更改数据的 CDC 解决方案。例如,请参阅数据流模板 here。

【讨论】:

以上是关于增量加载和 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

Azure Synapse - 增量数据加载

XOR EAX, 2(和加载/存储/指针增量)在哪里常用?

增量加载 s3 文件夹文件

“增量负载”是啥意思?

ListMapset的加载因子,默认初始容量和扩容增量

ListMapset的加载因子,默认初始容量和扩容增量