将 BigQuery 查询结果行写入 csv 文件时,某些记录重复

Posted

技术标签:

【中文标题】将 BigQuery 查询结果行写入 csv 文件时,某些记录重复【英文标题】:Some records are duplicated when writing BigQuery query result rows to csv file 【发布时间】:2021-12-16 17:05:07 【问题描述】:

我正在使用以下脚本从 bigquery 表中进行选择,然后遍历查询作业结果行并一次写入一行到 csv 文件。

我的问题是,对于某些查询作业,写入的总行数与预期相符,但有些行重复而其他行丢失。例如,查询本身可能返回 25k 行并且没有重复。但是在写入的25k行中,与查询结果相比,有15条记录重复,15条记录丢失。

一个有问题的文件在压缩前超过 3Gb,但对于其他查询(选择相同的列但不同的 visitStartTime 范围),该文件会更大,但不会出现重复问题。为什么对于某些查询作业,一些记录会重复写入,而另一些则根本不写入,这是否有原因?

from google.cloud import bigquery
from google.oauth2 import service_account
import csv

query_string = """select c1,c2,c3,c4,c5 
                  from `mydb.mydataset.mytable_20211212` 
                  where visitStartTime >= 1639350000.0 AND 
                  visitStartTime < 1639353600.0"""

credentials = service_account.Credentials.from_service_account_file(key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],)
client = bigquery.Client(credentials=credentials, project=credentials.project_id, )
query_job = client.query(query_string)

with open('myfilename.csv', 'w', newline='', encoding="utf-8") as csvfile:

    writer = csv.writer(csvfile, quoting=csv.QUOTE_NONE, delimiter='|', quotechar='', escapechar='\\')

    # write header row
    writer.writerow(["c1","c2","c3","c4","c5"])

    # write data rows
    for row in query_job:
        writer.writerow([row.c1, row.c2, row.c3, row.c4, row.c5])

【问题讨论】:

【参考方案1】:

根据我对您的问题的理解,如果您有 15 条记录重复,那么您将丢失另外 15 条记录(相同的数字)。如果 CSV 结果与查询不同,即使 CSV 是精确查询,那么问题一定出在 CSV 写入中。我建议您查看exporting data documentation 并查看给定的示例。由于您正在使用的大小,我将首先尝试 extract the compressed table 到一个存储桶:

from google.cloud import bigquery
client = bigquery.Client()
bucket_name = '[BUCKET_NAME]'

destination_uri = "gs:///".format(bucket_name, "[TABLE_NAME].csv.gz")
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table("[TABLE_NAME]")
job_config = bigquery.job.ExtractJobConfig()
job_config.compression = bigquery.Compression.GZIP

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location="[LOCATION]",
    job_config=job_config
)
extract_job.result()

如果这不起作用,我会尝试export the data in multiple files 。

如果其中任何一个都如您所愿,那么问题很可能在于query_string 的编写方式。您是否考虑过使用DISTINCT 或设置更多过滤器?

另外,问题也可能出在导出的数据上。你有没有试过,看到导出的“错误数据”,将visitStartTime设置为只导出这个“错误数据”?

【讨论】:

我无法导出压缩的完整表,因为它会远远超过 1GB 的最大值以供下游数据库加载(我目前正在根据 visitStartTime 分批导出 1 小时)。我尝试了较小的批次,但最终丢失了相同的欺骗/记录。我还尝试仅在发生重复/丢失记录时导出问题时间范围,并发现 Web UI 返回预期的记录数而没有重复,但 python 客户端为完全相同的查询返回 0 条记录。

以上是关于将 BigQuery 查询结果行写入 csv 文件时,某些记录重复的主要内容,如果未能解决你的问题,请参考以下文章

如何正确迭代所有 BigQuery 结果行?

如何计算查询结果行数?

将 BigQuery 查询结果直接写入 GCS - 可以吗?

如何将 SQL 查询结果行作为具有不同标题名称的列? [关闭]

怎样用sqlserver将查询结果行转列

如何在全范围内平均减少 SQL 查询的结果行?