如何在 BigQueryOperator 上参数化 write_disposition?

Posted

技术标签:

【中文标题】如何在 BigQueryOperator 上参数化 write_disposition?【英文标题】:How to parameterize write_disposition on a BigQueryOperator? 【发布时间】:2019-12-13 14:39:29 【问题描述】:

我正在尝试将 jenkins 上的当前管道调整为由气流编排。 我们的需求之一是能够通过手动触发器重新处理日期循环中的历史数据。

假设我刚刚在我的管道/dag 上添加了一个新阶段,并且我想在过去 n 天内执行它,而不更改已创建的表,我可以将所有 BigQueryOperators 设置为在 write_disposition 上工作:'WRITE_EMPTY'

我知道如果 write_disposition 是一个模板字段,那就这么简单:

BigQueryOperator(
            task_id="table_x",
            bql='sql_folder/my_query.sql',
            destination_dataset_table="project_id.dataset_id.my_table_ dag_run.conf["date_suffix"] ",
            write_disposition=' dag_run.conf["write_disposition"] ',
            params=
                 'event_date': " dag_run.conf["event_date"] "
            
        )

在日期循环上调用 dag:

airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '"date_suffix":"20191213", "event_date":"2019-12-13", "write_disposition": ""'

但事实并非如此。

有什么解决方法吗?还是另一种方法?

【问题讨论】:

【参考方案1】:

您可以为您的用例创建一个继承自 BigQueryOperatorCustomBigQueryOperator

例子:

class CustomBigQueryOperator(BigQueryOperator):
    template_fields = ('sql', 'destination_dataset_table', 'labels', 'write_disposition')

并在你的 DAG 中使用它。

【讨论】:

好@kaxil!谢谢!知道是否可以将任何异常参数传递给 jinja 模板?我的意思是:从日期时间导入日期时间; dag_run.conf["date_suffix"] if dag_run else datetime.now().strftime('%Y%m%d').

以上是关于如何在 BigQueryOperator 上参数化 write_disposition?的主要内容,如果未能解决你的问题,请参考以下文章

在 BigQueryOperator 中拉取 xcom

Apache Airflow - BigQueryOperator:如何动态设置destination_dataset_table 分区

可以使用气流 BigQueryOperator 制作 DDL bigquery 语句吗?

在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用

BigQueryOperator 在 write_disposition='WRITE_TRUNCATE' 时更改表架构和列模式

bigqueryoperator 是 write_truncate 1 个事务或 2 个事务