BigQueryOperator 在 write_disposition='WRITE_TRUNCATE' 时更改表架构和列模式

Posted

技术标签:

【中文标题】BigQueryOperator 在 write_disposition=\'WRITE_TRUNCATE\' 时更改表架构和列模式【英文标题】:BigQueryOperator changes the table schema and column modes when write_disposition='WRITE_TRUNCATE'BigQueryOperator 在 write_disposition='WRITE_TRUNCATE' 时更改表架构和列模式 【发布时间】:2020-10-20 07:50:05 【问题描述】:

我正在使用 Airflow 的 BigQueryOperator 使用 write_disposition='WRITE_TRUNCATE' 填充 BQ 表。问题是每次任务运行时,它都会将表模式和列模式从Required更改为Nullable。我使用的 create_disposition 是“CREATE_NEVER”。由于我的表是预先创建的,因此我不希望更改架构或列模式。使用 write_disposition='WRITE_APPEND' 解决了这个问题,但我的要求是使用 WRITE_TRUNCATE。知道为什么 BigQueryOperator 会更改架构和模式吗?

【问题讨论】:

【参考方案1】:

我有一个类似的问题,不是必需/可为空的 shcema 值,而是在策略标签上,并且行为是相同的:策略标签被覆盖(并丢失)。以下是 Google 支持团队的答案:

如果您覆盖到目标表,则会从表中删除任何现有的策略标签,除非您使用 --destination_schema 标志来指定带有策略标签的架构。 对于 WRITE_TRUNCATE,处置会覆盖现有表和架构。如果要保留策略标签,可以使用“--destination_schema”指定带有策略标签的架构。

但是,通过我在 python 中的测试,我观察到 QueryJob(基于 sql 查询的作业并将结果放入表中)和 LoadJob(从文件加载数据并接收数据的作业)之间的 2 种不同行为在表格中)。

如果您执行 LoadJob, 删除架构自动检测 获取原始表的架构 执行加载作业

像这样在 Python 中

    job_config = bigquery.job.LoadJobConfig()
    job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
    job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE
    job_config.skip_leading_rows = 1
    # job_config.autodetect = True
    job_config.schema = client.get_table(table).schema

    query_job = client.load_table_from_uri(uri, table, job_config=job_config)
    res = query_job.result()

此解决方案用于复制架构,不适用于 QueryJob


解决方法如下(适用于 LoadJob 和 QueryJob)

截断表格 在 WRITE_EMPTY 模式下执行作业

权衡

WRITE_TRUNCATE 是原子的:如果写入失败,数据不会被截断 解决方法分两步:如果写入失败,则数据已被删除
    config = bigquery.job.QueryJobConfig()
    config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
    config.write_disposition = bigquery.job.WriteDisposition.WRITE_EMPTY
    # Don't work
    # config.schema = client.get_table(table).schema
    config.destination = table

    # Step 1 truncate the table
    query_job = client.query(f'TRUNCATE TABLE `table`')
    res = query_job.result()
    # Step 2: Load the new data
    query_job = client.query(request, job_config=job_config)
    res = query_job.result()

所有这些都是为了告诉您 Airflow 上的 BigQuery 运算符不是问题所在。这是一个 BigQuery 问题。你有一个解决方法来实现你想要的。

【讨论】:

以上是关于BigQueryOperator 在 write_disposition='WRITE_TRUNCATE' 时更改表架构和列模式的主要内容,如果未能解决你的问题,请参考以下文章

如何在 BigQueryOperator 上参数化 write_disposition?

bigqueryoperator 气流上的 Bigquery 脚本

在 BigQueryOperator 中拉取 xcom

在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用

可以使用气流 BigQueryOperator 制作 DDL bigquery 语句吗?

Airflow 的 bigqueryoperator 不能与 udf 一起使用