Airflow 将 jinja 模板作为字符串

Posted

技术标签:

【中文标题】Airflow 将 jinja 模板作为字符串【英文标题】:Airflow is taking jinja template as string 【发布时间】:2018-11-26 07:33:48 【问题描述】:

在 Airflow 中,我尝试使用气流中的 jinja 模板,但问题是它没有被解析,而是被视为字符串。请看我的代码 ``

from datetime import datetime

from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

def test_method(dag,network_id,schema_name):
    print "Schema_name in test_method", schema_name
    third_task = PythonOperator(
        task_id='first_task_' + network_id,
        provide_context=True,
        python_callable=print_context2,
        dag=dag)
    return third_task

dag = DAG('testing_xcoms_pull', description='Testing Xcoms',
          schedule_interval='0 12 * * *',
          start_date= datetime.today(),
          catchup=False)


def print_context(ds, **kwargs):
    return 'Returning from print_context'

def print_context2(ds, **kwargs):
    return 'Returning from print_context2'

def get_schema(ds, **kwargs):
    # Returning schema name based on network_id
    schema_name = "my_schema"
    return get_schema

first_task = PythonOperator(
    task_id='first_task',
    provide_context=True,
    python_callable=print_context,
    dag=dag)


second_task = PythonOperator(
    task_id='second_task',
    provide_context=True,
    python_callable=get_schema,
    dag=dag)

network_id = ' dag_run.conf["network_id"]'

first_task >> second_task >> test_method(
                    dag=dag,
                    network_id=network_id,
                    schema_name=' ti.xcom_pull("second_task")')

``

Dag 创建失败,因为 ' dag_run.conf["network_id"]' 被气流视为字符串。谁能帮我解决我的代码中的问题???

【问题讨论】:

Airflow Jinja Rendered Template的可能重复 【参考方案1】:

气流运算符有一个名为 template_fields 的变量。这个变量通常声明在操作符类的顶部,查看 github 代码库中的任何操作符。

如果您尝试将 Jinja 模板语法传递到的字段不在 template_fields 列表中,则 jinja 语法将显示为字符串。

【讨论】:

【参考方案2】:

DAG 对象及其定义代码不会在执行上下文中解析,而是根据 Python 加载时可用的环境进行解析。

用于在函数中定义task_idnetwork_id 变量在执行之前没有模板化,因为没有执行活动,所以不能模板化。即使使用模板,您仍然需要一个有效的、静态的、非模板化的 task_id 值来实例化 DAG 对象。

【讨论】:

以上是关于Airflow 将 jinja 模板作为字符串的主要内容,如果未能解决你的问题,请参考以下文章

使用 jinja 模板中的 Airflow 连接

在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用

airflow任务运行抛出jinja2.exceptions.TemplateNotFound

Airflow Jinja模板无法与自定义操作员一起使用

无法在带有气流的 jinja 模板中使用 python 变量

获取与 django 1.x 集成的 jinja2 模板的翻译字符串?