使用来自 python 的 Airflow 触发 SQL 时出现模板错误?
Posted
技术标签:
【中文标题】使用来自 python 的 Airflow 触发 SQL 时出现模板错误?【英文标题】:Getting Template Error while triggering SQL using Airflow from python? 【发布时间】:2020-04-14 04:10:26 【问题描述】:我正在尝试从 Airflow 运行 SQL 脚本。由于模板错误而失败。
该脚本基本上是尝试从 Athena 运行 sql 并加载到 Redshift 表中。
SQl 放置在:redshift/sql/public/flow/KN_AWS_RE_ShipmentData_dup_update.sql
我的气流代码
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args =
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
dag = DAG('sample_sql', default_args=default_args, schedule_interval='0 4 * * *')
execute_notebook = PostgresOperator(
task_id='sample_sql',
postgres_conn_id='REDSHIFT_CONN',
sql="redshift/sql/public/flow/KN_AWS_RE_ShipmentData_dup_update.sql",
params='limit': '50',
dag=dag
)
错误
[2020-04-14 02:19:24,412] standard_task_runner.py:52 INFO - Started process 23012 to run task
[2020-04-14 02:19:24,482] logging_mixin.py:112 INFO - [2020-04-14 02:19:24,481] dagbag.py:403 INFO - Filling up the DagBag from /usr/local/airflow/dags/Scripts/Sample.py
[2020-04-14 02:19:24,495] baseoperator.py:807 ERROR - KN_AWS_RE_ShipmentData_dup_update.sql
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 805, in resolve_template_files
setattr(self, field, env.loader.get_source(env, content)[0])
File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 187, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: KN_AWS_RE_ShipmentData_dup_update.sql
[2020-04-14 02:19:24,545] logging_mixin.py:112 INFO - Running %s on host %s <TaskInstance: sample_sql.sample_sql 2020-04-14T02:14:08.020072+00:00 [running]> 0ca54c719ff7
[2020-04-14 02:19:24,583] taskinstance.py:1088 ERROR - KN_AWS_RE_ShipmentData_dup_update.sql
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 940, in _run_raw_task
self.render_templates(context=context)
我该如何解决这个问题?
【问题讨论】:
【参考方案1】:您必须在 DAG 实例化时指定 .sql
模板文件的路径,变量为 template_searchpath
。默认情况下,Jinja 会查看您的 DAG 文件夹。
请注意,您的 DAG 包含一种不好的做法,即使用动态的 start_date
。 start_date
应该是固定的(即datetime(2020,4,13)
)而不是动态的(即datetime.now()
)。你可以阅读更多关于它的信息here。
这表示我会尝试将 DAG 定义更改为:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
# Remove this
# yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args =
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,4,13), # Change this
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
dag = DAG(
'sample_sql',
default_args=default_args,
schedule_interval='0 4 * * *',
template_searchpath='/redshift/sql/public/flow/')
execute_notebook = PostgresOperator(
task_id='sample_sql',
postgres_conn_id='REDSHIFT_CONN',
sql='KN_AWS_RE_ShipmentData_dup_update.sql',
params='limit': '50',
dag=dag
)
execute_notebook # Tell airflow the tasks dependencies, in this case no dependency
当然你应该选择正确的绝对基本路径分配给template_searchpath
,比如/home/redshift/sql/public/flow
。
【讨论】:
以上是关于使用来自 python 的 Airflow 触发 SQL 时出现模板错误?的主要内容,如果未能解决你的问题,请参考以下文章
使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题
如何使用 TriggerDagRunOperator 触发 Airflow -dag
使用 Python 和 Airflow 在电子邮件中发送 Redshift 查询结果