在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用

Posted

技术标签:

【中文标题】在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用【英文标题】:Using Jinja template variables with BigQueryOperator in Airflow 【发布时间】:2019-07-30 21:47:33 【问题描述】:

我正在尝试通过使用变量填充 sql= 属性来在 Airflow 中使用 BigQueryOperator。我遇到的问题是使用 Jinja 变量时文件扩展名被删除。我的代码设置如下:

dag = DAG(
    dag_id='data_ingest_dag',
    template_searchpath=['/home/airflow/gcs/dags/sql/'],
    default_args=DEFAULT_DAG_ARGS
)

bigquery_transform = BigQueryOperator(
    task_id='bq-transform',
    write_disposition='WRITE_TRUNCATE',
    sql="dag_run.conf['sql_script']",
    destination_dataset_table='dag_run.conf["destination_dataset_table"]',
    dag=dag
)

传递的变量包含存储在单独 SQL 目录中的 SQL 文件的名称。如果我将该值作为静态字符串 sql="example_file.sql" 传递,则一切正常。但是,当我使用 Jinja 模板变量传递 example_file.sql 时,它会自动删除文件扩展名并收到此错误:

BigQuery job failed.
Final error was: u'reason': u'invalidQuery', u'message': u'Syntax error: Unexpected identifier "example_file" at [1:1]', u'location': u'query'

此外,我尝试将“.sql”硬编码到变量的末尾,以防扩展名将被删除。但是,这会导致整个变量引用被解释为字符串。

如何使用变量填充 BigQueryOperator 属性?

【问题讨论】:

【参考方案1】:

阅读 BigQuery 运算符docstring 似乎可以通过 2 种方式提供 sql 语句: 1. 作为可以包含模板宏的字符串 2. 对可以包含模板宏的文件的引用(文件,而不是文件名)。

不能模板化文件名,只能模板化 SQL 语句。事实上,您的错误消息显示 BigQuery 无法识别标识符“example_file”。如果您检查运行该查询的项目的 BigQuery 历史记录,您会看到查询字符串是“example_file.sql”,它不是有效的 SQL 语句,因此会出现错误。

【讨论】:

以上是关于在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用的主要内容,如果未能解决你的问题,请参考以下文章

Airflow Jinja模板无法与自定义操作员一起使用

Airflow 将 jinja 模板作为字符串

Airflow Jinja 渲染模板

无法在带有气流的 jinja 模板中使用 python 变量

airflow任务运行抛出jinja2.exceptions.TemplateNotFound

Airflow 中文文档:使用操作器