BigQueryOperator 在 write_disposition='WRITE_TRUNCATE' 时更改表架构和列模式
Posted
技术标签:
【中文标题】BigQueryOperator 在 write_disposition=\'WRITE_TRUNCATE\' 时更改表架构和列模式【英文标题】:BigQueryOperator changes the table schema and column modes when write_disposition='WRITE_TRUNCATE'BigQueryOperator 在 write_disposition='WRITE_TRUNCATE' 时更改表架构和列模式 【发布时间】:2020-10-20 07:50:05 【问题描述】:我正在使用 Airflow 的 BigQueryOperator 使用 write_disposition='WRITE_TRUNCATE' 填充 BQ 表。问题是每次任务运行时,它都会将表模式和列模式从Required更改为Nullable。我使用的 create_disposition 是“CREATE_NEVER”。由于我的表是预先创建的,因此我不希望更改架构或列模式。使用 write_disposition='WRITE_APPEND' 解决了这个问题,但我的要求是使用 WRITE_TRUNCATE。知道为什么 BigQueryOperator 会更改架构和模式吗?
【问题讨论】:
【参考方案1】:我有一个类似的问题,不是必需/可为空的 shcema 值,而是在策略标签上,并且行为是相同的:策略标签被覆盖(并丢失)。以下是 Google 支持团队的答案:
如果您覆盖到目标表,则会从表中删除任何现有的策略标签,除非您使用 --destination_schema 标志来指定带有策略标签的架构。 对于 WRITE_TRUNCATE,处置会覆盖现有表和架构。如果要保留策略标签,可以使用“--destination_schema”指定带有策略标签的架构。
但是,通过我在 python 中的测试,我观察到 QueryJob(基于 sql 查询的作业并将结果放入表中)和 LoadJob(从文件加载数据并接收数据的作业)之间的 2 种不同行为在表格中)。
如果您执行 LoadJob, 删除架构自动检测 获取原始表的架构 执行加载作业像这样在 Python 中
job_config = bigquery.job.LoadJobConfig()
job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE
job_config.skip_leading_rows = 1
# job_config.autodetect = True
job_config.schema = client.get_table(table).schema
query_job = client.load_table_from_uri(uri, table, job_config=job_config)
res = query_job.result()
此解决方案用于复制架构,不适用于 QueryJob
解决方法如下(适用于 LoadJob 和 QueryJob)
截断表格 在 WRITE_EMPTY 模式下执行作业权衡:
WRITE_TRUNCATE 是原子的:如果写入失败,数据不会被截断 解决方法分两步:如果写入失败,则数据已被删除 config = bigquery.job.QueryJobConfig()
config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
config.write_disposition = bigquery.job.WriteDisposition.WRITE_EMPTY
# Don't work
# config.schema = client.get_table(table).schema
config.destination = table
# Step 1 truncate the table
query_job = client.query(f'TRUNCATE TABLE `table`')
res = query_job.result()
# Step 2: Load the new data
query_job = client.query(request, job_config=job_config)
res = query_job.result()
所有这些都是为了告诉您 Airflow 上的 BigQuery 运算符不是问题所在。这是一个 BigQuery 问题。你有一个解决方法来实现你想要的。
【讨论】:
以上是关于BigQueryOperator 在 write_disposition='WRITE_TRUNCATE' 时更改表架构和列模式的主要内容,如果未能解决你的问题,请参考以下文章
如何在 BigQueryOperator 上参数化 write_disposition?
bigqueryoperator 气流上的 Bigquery 脚本
在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用