无法在带有气流的 jinja 模板中使用 python 变量

Posted

技术标签:

【中文标题】无法在带有气流的 jinja 模板中使用 python 变量【英文标题】:Can't use python variable in jinja template with Airflow 【发布时间】:2021-02-03 11:16:50 【问题描述】:

我正在尝试使用 Airflow 在 AWS EMR 上运行 11 步,并遵循此 code 作为参考。由于使用 EmrAddStepsOperator 和 EmrStepSensor 进行 11 步会重复太多。所以我试图遍历它。我在我的 DAG 中使用了以下代码。

step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]

# @evalcontextfilter
# def dangerous_render(context, value):
#     return Markup(Template(value).render(context)).render()

for i in range(0,len(steps)):
        #Add step
    step_adder.append(EmrAddStepsOperator(
        task_id=steps[i],
        job_flow_id=" task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') ",
        aws_conn_id='aws_default',
        steps=eval('step_'+str(i+1)),
    ))
    print(step_adder)
        #Step Sensor for checking
    step_checker.append(EmrStepSensor(
        task_id=steps[i]+'_check',
        job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
        #step_id=""task_instance.xcom_pull(task_ids=, key='return_value')[0]",steps[i]",
        step_id='(Template(" "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] ").render('params': 'step': steps[i]))',
        aws_conn_id='aws_default',
    ))

我在这里遇到一个错误,EmrStepSensor 期望来自 EMR 的 step_id 在这里输入,并且是从 xcom 获取生成的(我猜,我不是 100% 确定这段代码是如何工作的)。但是我的步骤存储在步骤列表中,因此我无法在 step_id 的 task_id 中给出静态值,就像在参考代码中给出的那样,我无法弄清楚如何使用带有 python 变量值的 jinja 模板将值放在这里从步骤列表中。

我使用了以下两种方式,以便step_id可以根据steps[i]中的步骤名称从EMR中获取正确的步骤

step_id=""task_instance.xcom_pull(task_ids=, key='return_value')[0]",steps[i]",

step_id='(Template(" "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] ")

但是,这两个都因 Airflow 中的语法错误而失败。因此,如果有人能指出我这样做的正确方向,我将不胜感激。我正在使用 Airflow 1.10.12(这是 AWS 上托管 Apache Airflow 中 Airflow 的默认版本)。

【问题讨论】:

【参考方案1】:

我不确定这是否已经解决,所以:

使用 f 字符串:

f" task_instance.xcom_pull(task_ids='steps[i]', key='return_value')[0] "

使用.format" task_instance.xcom_pull(task_ids='', key='return_value')[0] ".format(steps[i])

请注意,您必须确保键 task_ids 的值用单引号括起来。此外,xcom_pull 的返回是一个列表,因此索引 [0] 在末尾 o

【讨论】:

这是我要找的,[0] 有帮助,否则代码无法获取正确的步骤 ID。

以上是关于无法在带有气流的 jinja 模板中使用 python 变量的主要内容,如果未能解决你的问题,请参考以下文章

带有 jinja2 模板的 django allauth

带有缓存加载器的 Jinja2 与 django 模板 - 性能比较如何?

使用 jinja2 作为 django 模板引擎时出错:无法导入名称“环境”

使用 Flask 和 Jinja2 单击时无法加载模板 HTML [重复]

在 jinja 模板中使用索引访问列表数据

Jinja2 设置块在 Django 中无法识别