Airflow 将 jinja 模板作为字符串
Posted
技术标签:
【中文标题】Airflow 将 jinja 模板作为字符串【英文标题】:Airflow is taking jinja template as string 【发布时间】:2018-11-26 07:33:48 【问题描述】:在 Airflow 中,我尝试使用气流中的 jinja 模板,但问题是它没有被解析,而是被视为字符串。请看我的代码 ``
from datetime import datetime
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
def test_method(dag,network_id,schema_name):
print "Schema_name in test_method", schema_name
third_task = PythonOperator(
task_id='first_task_' + network_id,
provide_context=True,
python_callable=print_context2,
dag=dag)
return third_task
dag = DAG('testing_xcoms_pull', description='Testing Xcoms',
schedule_interval='0 12 * * *',
start_date= datetime.today(),
catchup=False)
def print_context(ds, **kwargs):
return 'Returning from print_context'
def print_context2(ds, **kwargs):
return 'Returning from print_context2'
def get_schema(ds, **kwargs):
# Returning schema name based on network_id
schema_name = "my_schema"
return get_schema
first_task = PythonOperator(
task_id='first_task',
provide_context=True,
python_callable=print_context,
dag=dag)
second_task = PythonOperator(
task_id='second_task',
provide_context=True,
python_callable=get_schema,
dag=dag)
network_id = ' dag_run.conf["network_id"]'
first_task >> second_task >> test_method(
dag=dag,
network_id=network_id,
schema_name=' ti.xcom_pull("second_task")')
``
Dag 创建失败,因为 ' dag_run.conf["network_id"]'
被气流视为字符串。谁能帮我解决我的代码中的问题???
【问题讨论】:
Airflow Jinja Rendered Template的可能重复 【参考方案1】:气流运算符有一个名为 template_fields 的变量。这个变量通常声明在操作符类的顶部,查看 github 代码库中的任何操作符。
如果您尝试将 Jinja 模板语法传递到的字段不在 template_fields 列表中,则 jinja 语法将显示为字符串。
【讨论】:
【参考方案2】:DAG
对象及其定义代码不会在执行上下文中解析,而是根据 Python 加载时可用的环境进行解析。
用于在函数中定义task_id
的network_id
变量在执行之前没有模板化,因为没有执行活动,所以不能模板化。即使使用模板,您仍然需要一个有效的、静态的、非模板化的 task_id
值来实例化 DAG
对象。
【讨论】:
以上是关于Airflow 将 jinja 模板作为字符串的主要内容,如果未能解决你的问题,请参考以下文章
在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用
airflow任务运行抛出jinja2.exceptions.TemplateNotFound