在相同的气流执行中创建和删除 BQ 临时表,然后在下一次运行时先创建表但表删除不起作用
Posted
技术标签:
【中文标题】在相同的气流执行中创建和删除 BQ 临时表,然后在下一次运行时先创建表但表删除不起作用【英文标题】:Creating and Dropping a BQ temp table in same execution of airflow and then in next run while the table is created first but table drop is not working 【发布时间】:2021-11-13 12:12:01 【问题描述】:在相同的气流执行中创建和删除 BQ 临时表,然后在下一次运行时先创建该表但无法识别该表以进行删除。
第一次执行:
创建_BQ 临时表,然后删除该表。 DAG 运行良好。
第二次执行:
最初表不可用,因为在之前的执行中删除。在这次执行中;在检查 table_name == 'temp_table_name' 时首先创建表并在删除之前创建表;它返回 false 并且表删除未执行。
【问题讨论】:
如果您觉得我的回答对您的问题有帮助,请考虑按照*** guidelines接受/投票 【参考方案1】:您面临的问题可能是因为 DAG 是从一个回溯日期安排的。在这种情况下,Airflow 不会在连续 DAG 运行之间给出指定的 schedule_interval。似乎这使得在 BigQuery 中更新表元数据的时间非常少。因此,我将参数 retries 设置为 1,retry_delay 设置为 2 分钟,即如果删除任务失败,将在等待 2 分钟后重试删除任务。此外,在您的情况下,每次都使用相同的名称创建表,这可能会导致并发运行的 DAG 的任务失败,因此我将 depends_on_past 设置为 True 以便后续 DAG 的任务运行仅当上一次 DAG 运行成功时。
你可以参考下面的代码。
代码 1:
import os
import time
import datetime
from urllib.parse import urlparse
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyTableOperator,
BigQueryDeleteTableOperator
)
from airflow.utils.dates import days_ago
PROJECT_ID = "my-project"
BQ_LOCATION = "us-east1"
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args =
'start_date': YESTERDAY,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=2),
'depends_on_past':True,
with models.DAG(
"DAG9",
schedule_interval=datetime.timedelta(minutes=5),
default_args=default_dag_args ) as dag:
# [START howto_operator_bigquery_create_table]
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id="demo14",
table_id="tab",
schema_fields=[
"name": "emp_name", "type": "STRING", "mode": "REQUIRED",
"name": "salary", "type": "INTEGER", "mode": "NULLABLE",
],
)
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table="my-project.demo14.tab",
)
create_table >> delete_table
当我计划从当前日期开始运行 DAG 时,我没有遇到任何不一致的情况。可以参考下面的代码。
代码 2:
import os
import time
import datetime
from urllib.parse import urlparse
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyTableOperator,
BigQueryDeleteTableOperator
)
from airflow.utils.dates import days_ago
PROJECT_ID = "my-project"
BQ_LOCATION = "us-east1"
with models.DAG(
"DAG5",
schedule_interval=datetime.timedelta(minutes=5), # Override to match your needs
start_date=datetime.datetime(2021,11,14,7,30),
tags=["example"],
) as dag:
# [START howto_operator_bigquery_create_table]
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id="demo12",
table_id="tab",
schema_fields=[
"name": "emp_name", "type": "STRING", "mode": "REQUIRED",
"name": "salary", "type": "INTEGER", "mode": "NULLABLE",
],
)
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table="my-project.demo12.tab",
)
create_table >> delete_table
【讨论】:
以上是关于在相同的气流执行中创建和删除 BQ 临时表,然后在下一次运行时先创建表但表删除不起作用的主要内容,如果未能解决你的问题,请参考以下文章
如何使用相同的表单在 ruby on rails 中创建和编辑
Oracle Sql Developer如何在Oracle中创建和设置角色?