BigQuery MERGE 意外的行重复
Posted
技术标签:
【中文标题】BigQuery MERGE 意外的行重复【英文标题】:BigQuery MERGE unexpected row duplication 【发布时间】:2020-08-20 11:02:56 【问题描述】:我正在使用标准 SQL MERGE 来更新基于源外部表的常规目标表,该源外部表是存储桶中的一组 CVS 文件。这是一个简化的输入文件:
$ gsutil cat gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv
"id" "PortfolioCode" "ValuationDate" "load_checksum"
"1" "CIMDI000TT" "2020-03-28" "checksum1"
MERGE 语句是:
MERGE xx_producer_conformed.demo T
USING xx_producer_raw.demo_raw S
ON
S.id = T.id
WHEN NOT MATCHED THEN
INSERT (id, PortfolioCode, ValuationDate, load_checksum, insert_time, file_name, extract_timestamp, wf_id)
VALUES (id, PortfolioCode, ValuationDate, load_checksum, CURRENT_TIMESTAMP(), _FILE_NAME, REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]8_([0-9]14).tsv'),'scheduled__2020-08-19T16:24:00+00:00')
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
T.id = S.id, T.PortfolioCode = S.PortfolioCode, T.ValuationDate = S.ValuationDate, T.load_checksum = S.load_checksum, T.file_name = S._FILE_NAME, T.extract_timestamp = REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]8_([0-9]14).tsv'), T.wf_id = 'scheduled__2020-08-19T16:24:00+00:00'
如果我擦除目标表并重新运行 MERGE,我得到的行修改计数为 1:
bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql | awk 'ORS=" "')"
Waiting on bqjob_r288f8d33_000001740b413532_1 ... (0s) Current status: DONE
Number of affected rows: 1
这成功导致目标表更新:
$ bq query --format=csv --max_rows=10 --use_legacy_sql=false "select * from ta_producer_conformed.demo"
Waiting on bqjob_r7f6b6a46_000001740b5057a3_1 ... (0s) Current status: DONE
id,PortfolioCode,ValuationDate,load_checksum,insert_time,file_name,extract_timestamp,wf_id
1,CIMDI000TT,2020-03-28,checksum1,2020-08-20 09:44:20,gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv,20200505050505,scheduled__2020-08-19T16:24:00+00:00
如果我再次返回 MERGE,我得到的行修改计数为 0:
$ bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql | awk 'ORS=" "')"
Waiting on bqjob_r3de2f833_000001740b4161b3_1 ... (0s) Current status: DONE
Number of affected rows: 0
这不会导致目标表发生任何更改。所以一切正常。
问题是,当我在一个更复杂的示例上运行代码时,我将许多输入文件插入到一个空的目标表中,我最终会得到具有相同 id
的行,其中 count(id)
不等于 @987654328 @:
$ bq query --use_legacy_sql=false --max_rows=999999 --location=asia-east2 "select count(id) as total_records from xx_producer_conformed.xxx; select count(distinct id) as unique_records from xx_producer_conformed.xxx; "
Waiting on bqjob_r5df5bec8_000001740b7dfa50_1 ... (1s) Current status: DONE
select count(id) as total_records from xx_producer_conformed.xxx; -- at [1:1]
+---------------+
| total_records |
+---------------+
| 11582 |
+---------------+
select count(distinct id) as unique_records from xx_producer_conformed.xxx; -- at [1:78]
+----------------+
| unique_records |
+----------------+
| 5722 |
+----------------+
这让我感到惊讶,因为我的期望是底层逻辑将逐步遍历每个底层文件中的每一行,并且只插入第一个 id
然后在任何后续 id
上插入它会更新。所以我的期望是输入存储桶中的行数不能超过唯一的id
s。
如果我再次尝试运行 MERGE,它会告诉我目标表中有不止一行具有相同的 id:
$ bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql | awk 'ORS=" "')"
Waiting on bqjob_r2fe783fc_000001740b8271aa_1 ... (0s) Current status: DONE
Error in query string: Error processing job 'xxxx-10843454-datamesh-
dev:bqjob_r2fe783fc_000001740b8271aa_1': UPDATE/MERGE must match at most one
source row for each target row
我原以为当 MERGE 语句插入时不会有两行具有相同的“id”。
使用的所有表和查询都是从列出“业务列”的文件中生成的。因此,上面的简单演示示例在登录和 MERGE 语句中的连接方面与全面查询相同。
为什么上面的 MERGE 查询会导致“id”重复的行,我该如何解决这个问题?
【问题讨论】:
【参考方案1】:通过擦除目标表并复制一个相对较大的输入作为输入,这个问题很容易重复:
AAAA_20200805_20200814200000.tsv
AAAA_clone_20200805_20200814200000.tsv
我相信其核心是并行性。许多文件的单个大型 MERGE 可以并行生成许多工作线程。任何两个并行运行的工作线程加载不同的文件以立即“看到”彼此的插入将非常慢。相反,我希望它们独立运行,而不是“看到”彼此写入单独的缓冲区。当缓冲区最终合并时,它会导致多个插入具有相同的id
。
为了解决这个问题,我正在使用一些 CTE 根据 extract_timestamp 使用 ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC)
来选择任何 id
的最新记录。然后我们可以按最低值过滤以选择最新版本的记录。完整的查询是:
MERGE xx_producer_conformed.demo T
USING (
WITH cteExtractTimestamp AS (
SELECT
id, PortfolioCode, ValuationDate, load_checksum
, _FILE_NAME
, REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]8_([0-9]14).tsv') AS extract_timestamp
FROM
xx_producer_raw.demo_raw
),
cteRanked AS (
SELECT
id, PortfolioCode, ValuationDate, load_checksum
, _FILE_NAME
, extract_timestamp
, ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC) AS row_num
FROM
cteExtractTimestamp
)
SELECT
id, PortfolioCode, ValuationDate, load_checksum
, _FILE_NAME
, extract_timestamp
, row_num
, " task_instance.xcom_pull(task_ids='get_run_id') " AS wf_id
FROM cteRanked
WHERE row_num = 1
) S
ON
S.id = T.id
WHEN NOT MATCHED THEN
INSERT (id, PortfolioCode, ValuationDate, load_checksum, insert_time, file_name, extract_timestamp, wf_id)
VALUES (id, PortfolioCode, ValuationDate, load_checksum, CURRENT_TIMESTAMP(), _FILE_NAME, extract_timestamp, wf_id)
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
T.id = S.id, T.PortfolioCode = S.PortfolioCode, T.ValuationDate = S.ValuationDate, T.load_checksum = S.load_checksum, T.file_name = S._FILE_NAME, T.extract_timestamp = S.extract_timestamp, T.wf_id = S.wf_id
这意味着克隆文件而不更改文件名中的 extract_timestamp 将随机选择两行之一。在正常运行中,我们期望具有更新数据的后续提取是具有新 extract_timetamp 的源文件。然后上面的查询将选择最新的记录合并到目标表中。
【讨论】:
以上是关于BigQuery MERGE 意外的行重复的主要内容,如果未能解决你的问题,请参考以下文章