BigQuery MERGE 意外的行重复

Posted

技术标签:

【中文标题】BigQuery MERGE 意外的行重复【英文标题】:BigQuery MERGE unexpected row duplication 【发布时间】:2020-08-20 11:02:56 【问题描述】:

我正在使用标准 SQL MERGE 来更新基于源外部表的常规目标表,该源外部表是存储桶中的一组 CVS 文件。这是一个简化的输入文件:

$ gsutil cat gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv
"id"    "PortfolioCode" "ValuationDate" "load_checksum"
"1"     "CIMDI000TT"    "2020-03-28"    "checksum1"

MERGE 语句是:

MERGE xx_producer_conformed.demo T
USING xx_producer_raw.demo_raw S
ON
    S.id = T.id
WHEN NOT MATCHED THEN
    INSERT (id, PortfolioCode, ValuationDate, load_checksum, insert_time, file_name, extract_timestamp, wf_id)
    VALUES (id, PortfolioCode, ValuationDate, load_checksum, CURRENT_TIMESTAMP(), _FILE_NAME, REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]8_([0-9]14).tsv'),'scheduled__2020-08-19T16:24:00+00:00')
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
    T.id = S.id, T.PortfolioCode = S.PortfolioCode, T.ValuationDate = S.ValuationDate, T.load_checksum = S.load_checksum, T.file_name = S._FILE_NAME, T.extract_timestamp = REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]8_([0-9]14).tsv'), T.wf_id = 'scheduled__2020-08-19T16:24:00+00:00'

如果我擦除目标表并重新运行 MERGE,我得到的行修改计数为 1:

bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql  |  awk 'ORS=" "')"
Waiting on bqjob_r288f8d33_000001740b413532_1 ... (0s) Current status: DONE
Number of affected rows: 1

这成功导致目标表更新:

$ bq query --format=csv --max_rows=10 --use_legacy_sql=false "select * from ta_producer_conformed.demo"
Waiting on bqjob_r7f6b6a46_000001740b5057a3_1 ... (0s) Current status: DONE
id,PortfolioCode,ValuationDate,load_checksum,insert_time,file_name,extract_timestamp,wf_id
1,CIMDI000TT,2020-03-28,checksum1,2020-08-20 09:44:20,gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv,20200505050505,scheduled__2020-08-19T16:24:00+00:00

如果我再次返回 MERGE,我得到的行修改计数为 0:

$ bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql  |  awk 'ORS=" "')"
Waiting on bqjob_r3de2f833_000001740b4161b3_1 ... (0s) Current status: DONE
Number of affected rows: 0

这不会导致目标表发生任何更改。所以一切正常。

问题是,当我在一个更复杂的示例上运行代码时,我将许多输入文件插入到一个空的目标表中,我最终会得到具有相同 id 的行,其中 count(id) 不等于 @987654328 @:

$ bq query --use_legacy_sql=false --max_rows=999999 --location=asia-east2 "select count(id) as total_records from xx_producer_conformed.xxx; select count(distinct id) as unique_records from xx_producer_conformed.xxx; "
Waiting on bqjob_r5df5bec8_000001740b7dfa50_1 ... (1s) Current status: DONE
select count(id) as total_records from xx_producer_conformed.xxx; -- at [1:1]
+---------------+
| total_records |
+---------------+
|         11582 |
+---------------+
select count(distinct id) as unique_records from xx_producer_conformed.xxx; -- at [1:78]
+----------------+
| unique_records |
+----------------+
|           5722 |
+----------------+

这让我感到惊讶,因为我的期望是底层逻辑将逐步遍历每个底层文件中的每一行,并且只插入第一个 id 然后在任何后续 id 上插入它会更新。所以我的期望是输入存储桶中的行数不能超过唯一的ids。

如果我再次尝试运行 MERGE,它会告诉我目标表中有不止一行具有相同的 id:

$  bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql  |  awk 'ORS=" "')"
Waiting on bqjob_r2fe783fc_000001740b8271aa_1 ... (0s) Current status: DONE
Error in query string: Error processing job 'xxxx-10843454-datamesh-
dev:bqjob_r2fe783fc_000001740b8271aa_1': UPDATE/MERGE must match at most one
source row for each target row

我原以为当 MERGE 语句插入时不会有两行具有相同的“id”。

使用的所有表和查询都是从列出“业务列”的文件中生成的。因此,上面的简单演示示例在登录和 MERGE 语句中的连接方面与全面查询相同。

为什么上面的 MERGE 查询会导致“id”重复的行,我该如何解决这个问题?

【问题讨论】:

【参考方案1】:

通过擦除目标表并复制一个相对较大的输入作为输入,这个问题很容易重复:

AAAA_20200805_20200814200000.tsv
AAAA_clone_20200805_20200814200000.tsv

我相信其核心是并行性。许多文件的单个大型 MERGE 可以并行生成许多工作线程。任何两个并行运行的工作线程加载不同的文件以立即“看到”彼此的插入将非常慢。相反,我希望它们独立运行,而不是“看到”彼此写入单独的缓冲区。当缓冲区最终合并时,它会导致多个插入具有相同的id

为了解决这个问题,我正在使用一些 CTE 根据 extract_timestamp 使用 ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC) 来选择任何 id 的最新记录。然后我们可以按最低值过滤以选择最新版本的记录。完整的查询是:

MERGE xx_producer_conformed.demo T
USING (
    WITH cteExtractTimestamp AS (
        SELECT
            id, PortfolioCode, ValuationDate, load_checksum
            , _FILE_NAME
            , REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]8_([0-9]14).tsv') AS extract_timestamp
        FROM
            xx_producer_raw.demo_raw
    ),
    cteRanked AS (
        SELECT
            id, PortfolioCode, ValuationDate, load_checksum
            , _FILE_NAME
            , extract_timestamp
            , ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC) AS row_num
        FROM 
            cteExtractTimestamp
    )
    SELECT 
        id, PortfolioCode, ValuationDate, load_checksum
        , _FILE_NAME
        , extract_timestamp
        , row_num
        , " task_instance.xcom_pull(task_ids='get_run_id') " AS wf_id
    FROM cteRanked 
    WHERE row_num = 1
) S
ON
    S.id = T.id
WHEN NOT MATCHED THEN
    INSERT (id, PortfolioCode, ValuationDate, load_checksum, insert_time, file_name, extract_timestamp, wf_id)
    VALUES (id, PortfolioCode, ValuationDate, load_checksum, CURRENT_TIMESTAMP(), _FILE_NAME, extract_timestamp, wf_id)
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
    T.id = S.id, T.PortfolioCode = S.PortfolioCode, T.ValuationDate = S.ValuationDate, T.load_checksum = S.load_checksum, T.file_name = S._FILE_NAME, T.extract_timestamp = S.extract_timestamp, T.wf_id = S.wf_id

这意味着克隆文件而不更改文件名中的 extract_timestamp 将随机选择两行之一。在正常运行中,我们期望具有更新数据的后续提取是具有新 extract_timetamp 的源文件。然后上面的查询将选择最新的记录合并到目标表中。

【讨论】:

以上是关于BigQuery MERGE 意外的行重复的主要内容,如果未能解决你的问题,请参考以下文章

Bigquery:检查流期间的重复项

上个月的最后一天 - BigQuery

如何禁止向BigQuery加载重复的行?

Bigquery 返回重复的行以及错误的计数

JSON 表架构到 bigquery.TableSchema 用于 BigQuerySink

根据条件对 BigQuery 中的行进行重复数据删除