在相同的气流执行中创建和删除 BQ 临时表,然后在下一次运行时先创建表但表删除不起作用

Posted

技术标签:

【中文标题】在相同的气流执行中创建和删除 BQ 临时表,然后在下一次运行时先创建表但表删除不起作用【英文标题】:Creating and Dropping a BQ temp table in same execution of airflow and then in next run while the table is created first but table drop is not working 【发布时间】:2021-11-13 12:12:01 【问题描述】:

在相同的气流执行中创建和删除 BQ 临时表,然后在下一次运行时先创建该表但无法识别该表以进行删除。

第一次执行:

创建_BQ 临时表,然后删除该表。 DAG 运行良好。

第二次执行:

最初表不可用,因为在之前的执行中删除。在这次执行中;在检查 table_name == 'temp_table_name' 时首先创建表并在删除之前创建表;它返回 false 并且表删除未执行。

【问题讨论】:

如果您觉得我的回答对您的问题有帮助,请考虑按照*** guidelines接受/投票 【参考方案1】:

您面临的问题可能是因为 DAG 是从一个回溯日期安排的。在这种情况下,Airflow 不会在连续 DAG 运行之间给出指定的 schedule_interval。似乎这使得在 BigQuery 中更新表元数据的时间非常少。因此,我将参数 retries 设置为 1,retry_delay 设置为 2 分钟,即如果删除任务失败,将在等待 2 分钟后重试删除任务。此外,在您的情况下,每次都使用相同的名称创建表,这可能会导致并发运行的 DAG 的任务失败,因此我将 depends_on_past 设置为 True 以便后续 DAG 的任务运行仅当上一次 DAG 运行成功时。

你可以参考下面的代码。

代码 1:

import os
import time
import datetime
from urllib.parse import urlparse

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (

    BigQueryCreateEmptyTableOperator,
    BigQueryDeleteTableOperator
)
from airflow.utils.dates import days_ago

PROJECT_ID = "my-project"
BQ_LOCATION = "us-east1"
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_dag_args = 

    'start_date': YESTERDAY,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=2),
    'depends_on_past':True,
    

with models.DAG(
    "DAG9",
    schedule_interval=datetime.timedelta(minutes=5),
    default_args=default_dag_args ) as dag:
    
    # [START howto_operator_bigquery_create_table]
    create_table = BigQueryCreateEmptyTableOperator(
        task_id="create_table",
        dataset_id="demo14",
        table_id="tab",
        schema_fields=[
            "name": "emp_name", "type": "STRING", "mode": "REQUIRED",
            "name": "salary", "type": "INTEGER", "mode": "NULLABLE",
        ],
    )

    delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",
    deletion_dataset_table="my-project.demo14.tab",

    )

    create_table >> delete_table

当我计划从当前日期开始运行 DAG 时,我没有遇到任何不一致的情况。可以参考下面的代码。

代码 2:

import os
import time
import datetime
from urllib.parse import urlparse

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (

    BigQueryCreateEmptyTableOperator,
    BigQueryDeleteTableOperator
)
from airflow.utils.dates import days_ago

PROJECT_ID = "my-project"
BQ_LOCATION = "us-east1"


with models.DAG(
    "DAG5",
    schedule_interval=datetime.timedelta(minutes=5),  # Override to match your needs
    start_date=datetime.datetime(2021,11,14,7,30),
    tags=["example"],
) as dag:
    # [START howto_operator_bigquery_create_table]
    create_table = BigQueryCreateEmptyTableOperator(
        task_id="create_table",
        dataset_id="demo12",
        table_id="tab",
        schema_fields=[
            "name": "emp_name", "type": "STRING", "mode": "REQUIRED",
            "name": "salary", "type": "INTEGER", "mode": "NULLABLE",
        ],
    )

    delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",
    deletion_dataset_table="my-project.demo12.tab",

    )

    create_table >> delete_table

【讨论】:

以上是关于在相同的气流执行中创建和删除 BQ 临时表,然后在下一次运行时先创建表但表删除不起作用的主要内容,如果未能解决你的问题,请参考以下文章

在java中创建和初始化对象

如何使用相同的表单在 ruby​​ on rails 中创建和编辑

如何将数据帧传递到气流任务的临时表中

Oracle Sql Developer如何在Oracle中创建和设置角色?

在 azure web 角色的临时文件夹中创建和读取 html 文件并提供它

在 SQL 中创建和更改表值函数时对象类型不兼容