气流 - 如何将 xcom 变量传递给 Python 函数

Posted

技术标签:

【中文标题】气流 - 如何将 xcom 变量传递给 Python 函数【英文标题】:Airflow - How to pass xcom variable into Python function 【发布时间】:2018-02-13 23:55:29 【问题描述】:

我需要引用BashOperator 返回的变量。在我的task_archive_s3_file 中,我需要从get_s3_file 获取文件名。该任务只是将 ti.xcom_pull(task_ids=submit_file_to_spark) 打印为字符串而不是值。

如果我使用bash_command,值会正确打印。

get_s3_file = PythonOperator(
    task_id='get_s3_file',
    python_callable=obj.func_get_s3_file,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag)

submit_file_to_spark = BashOperator(
    task_id='submit_file_to_spark',
    bash_command="echo 'hello world'",
    trigger_rule="all_done",
    xcom_push=True,
    dag=dag)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
#    bash_command="echo  ti.xcom_pull(task_ids='submit_file_to_spark') ",
    python_callable=obj.func_archive_s3_file,
    params='s3_path_filename': " ti.xcom_pull(task_ids=submit_file_to_spark) " ,
    dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file

【问题讨论】:

【参考方案1】:

ti.xcom_pull(...) 之类的模板只能在支持模板的参数内部使用,否则它们不会在执行前呈现。查看PythonOperator 和BashOperator 的template_fieldstemplate_ext 属性。

所以templates_dict 是您用来将模板传递给您的python 运算符的:

def func_archive_s3_file(**context):
    archive(context['templates_dict']['s3_path_filename'])

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,  # must pass this because templates_dict gets passed via context
    templates_dict='s3_path_filename': " ti.xcom_pull(task_ids='submit_file_to_spark') " )

但是,在获取 XCom 值的情况下,另一种选择是使用通过上下文提供给您的 TaskInstance 对象:

def func_archive_s3_file(**context):
    archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,

【讨论】:

愚蠢的问题,但如果我通过xcom_pull 使用最后一个示例,它会重新运行所述任务吗?我假设 xcoms 从一个任务传递到另一个任务(按顺序)。在我的示例中,我需要从第一个任务中给出的文件名。 不,它不会重新运行任务。 XCom 推/拉只是根据 DAG id、执行日期、任务 id 和密钥从气流数据库中的 xcom 表中添加/检索一行。像您已经拥有的那样声明 submit_file_to_spark >> task_archive_s3_file 的依赖项应该足以确保文件名在检索之前被推送到 xcom。 太糟糕了,你只能通过这种方式传递字符串。我想传递对象。 @Julio 从技术上讲,您可以通过 XCom 发送腌制对象,但出于安全考虑,它将在 Airflow 2.0 中被弃用,有关更多详细信息,请参阅相关的 enable_xcom_pickling 配置。 @tatlar 你能看看类似的问题吗:***.com/questions/67631581/…【参考方案2】:

对问题和答案都投了赞成票,但我认为对于那些只想在其 DAG 中的 PythonOperator 任务之间传递小数据对象的用户来说,这可以更清楚一点。参考这个问题和this XCom example 让我得到了以下解决方案。超级简单:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

DAG = DAG(
  dag_id='example_dag',
  start_date=datetime.now(),
  schedule_interval='@once'
)

def push_function(**kwargs):
    ls = ['a', 'b', 'c']
    return ls

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='push_task')
    print(ls)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

我不确定为什么会这样,但确实如此。向社区提出的几个问题:

ti 在这里发生了什么? **kwargs 是如何内置的? 这两个功能都需要provide_context=True 吗?

非常欢迎任何使这个答案更清晰的编辑!

【讨论】:

这是可行的,因为任何返回值的任务都存储在 xcom (source) 中。所以PythonOperator 的任何返回值都会保存到 XCom(需要小心!)。使用引用的**kwargs 需要提供上下文,我通常将其命名为**context。此上下文与您在 jinja 模板 (source) 中获得的上下文相同。上下文提供了许多特定于 DAG 运行的有用信息。 @DanielHuang 很棒,很好的解释。感谢您的帮助! @DanielHuang 再问一个问题:provide_context 是推拉都需要,还是只推? 两者都有!要推送或拉取,您需要访问当前运行的TaskInstance 对象,该对象只能通过context 获得。 请添加此导入语句 from datetime import datetime【参考方案3】:

使用相同的代码并修改参数,如Startdate 等。

import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = 
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),


DAG = DAG(
  dag_id='simple_xcom',
  default_args=args,
#  start_date=datetime(2019, 04, 21),
  schedule_interval="@daily",
  #schedule_interval=timedelta(1)
)

def push_function(**context):
    msg='the_message'
    print("message to push: '%s'" % msg)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="the_message", value=msg)

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    msg = ti.xcom_pull(task_ids='push_task',key='the_message')
    print("received message: '%s'" % msg)

pull_task = PythonOperator(`enter code here`
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

如果你想知道context['task_instance']kwargs['ti']是从哪里来的,可以参考Airflow macro documentation

【讨论】:

ti 和 'task_instance' 的名称从何而来? @LiuWeibo 查看 Airflow 宏:airflow.apache.org/code.html#macros 气流宏链接更新:airflow.apache.org/docs/stable/macros-ref.html @Kiwy 你可以看看类似的问题***.com/questions/67631581/… ti 和 task_instance 都是一样的

以上是关于气流 - 如何将 xcom 变量传递给 Python 函数的主要内容,如果未能解决你的问题,请参考以下文章

气流测试模式 xcom 拉/推不工作

运营商之间的气流和数据传输

将 xcom 值传递给 Airflow 中的 JiraOperator

通过字典将参数动态传递给气流算子

如何将数据帧传递到气流任务的临时表中

Apache Airflow 中的分布式日志记录