在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用
Posted
技术标签:
【中文标题】在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用【英文标题】:Using Jinja template variables with BigQueryOperator in Airflow 【发布时间】:2019-07-30 21:47:33 【问题描述】:我正在尝试通过使用变量填充 sql=
属性来在 Airflow 中使用 BigQueryOperator。我遇到的问题是使用 Jinja 变量时文件扩展名被删除。我的代码设置如下:
dag = DAG(
dag_id='data_ingest_dag',
template_searchpath=['/home/airflow/gcs/dags/sql/'],
default_args=DEFAULT_DAG_ARGS
)
bigquery_transform = BigQueryOperator(
task_id='bq-transform',
write_disposition='WRITE_TRUNCATE',
sql="dag_run.conf['sql_script']",
destination_dataset_table='dag_run.conf["destination_dataset_table"]',
dag=dag
)
传递的变量包含存储在单独 SQL 目录中的 SQL 文件的名称。如果我将该值作为静态字符串 sql="example_file.sql"
传递,则一切正常。但是,当我使用 Jinja 模板变量传递 example_file.sql
时,它会自动删除文件扩展名并收到此错误:
BigQuery job failed.
Final error was: u'reason': u'invalidQuery', u'message': u'Syntax error: Unexpected identifier "example_file" at [1:1]', u'location': u'query'
此外,我尝试将“.sql”硬编码到变量的末尾,以防扩展名将被删除。但是,这会导致整个变量引用被解释为字符串。
如何使用变量填充 BigQueryOperator 属性?
【问题讨论】:
【参考方案1】:阅读 BigQuery 运算符docstring 似乎可以通过 2 种方式提供 sql 语句: 1. 作为可以包含模板宏的字符串 2. 对可以包含模板宏的文件的引用(文件,而不是文件名)。
您不能模板化文件名,只能模板化 SQL 语句。事实上,您的错误消息显示 BigQuery 无法识别标识符“example_file”。如果您检查运行该查询的项目的 BigQuery 历史记录,您会看到查询字符串是“example_file.sql”,它不是有效的 SQL 语句,因此会出现错误。
【讨论】:
以上是关于在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用的主要内容,如果未能解决你的问题,请参考以下文章
无法在带有气流的 jinja 模板中使用 python 变量