气流 - 如何将 xcom 变量传递给 Python 函数
Posted
技术标签:
【中文标题】气流 - 如何将 xcom 变量传递给 Python 函数【英文标题】:Airflow - How to pass xcom variable into Python function 【发布时间】:2018-02-13 23:55:29 【问题描述】:我需要引用BashOperator
返回的变量。在我的task_archive_s3_file
中,我需要从get_s3_file
获取文件名。该任务只是将 ti.xcom_pull(task_ids=submit_file_to_spark)
打印为字符串而不是值。
如果我使用bash_command
,值会正确打印。
get_s3_file = PythonOperator(
task_id='get_s3_file',
python_callable=obj.func_get_s3_file,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)
submit_file_to_spark = BashOperator(
task_id='submit_file_to_spark',
bash_command="echo 'hello world'",
trigger_rule="all_done",
xcom_push=True,
dag=dag)
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
# bash_command="echo ti.xcom_pull(task_ids='submit_file_to_spark') ",
python_callable=obj.func_archive_s3_file,
params='s3_path_filename': " ti.xcom_pull(task_ids=submit_file_to_spark) " ,
dag=dag)
get_s3_file >> submit_file_to_spark >> task_archive_s3_file
【问题讨论】:
【参考方案1】: ti.xcom_pull(...)
之类的模板只能在支持模板的参数内部使用,否则它们不会在执行前呈现。查看PythonOperator 和BashOperator 的template_fields
和template_ext
属性。
所以templates_dict
是您用来将模板传递给您的python 运算符的:
def func_archive_s3_file(**context):
archive(context['templates_dict']['s3_path_filename'])
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=obj.func_archive_s3_file,
provide_context=True, # must pass this because templates_dict gets passed via context
templates_dict='s3_path_filename': " ti.xcom_pull(task_ids='submit_file_to_spark') " )
但是,在获取 XCom 值的情况下,另一种选择是使用通过上下文提供给您的 TaskInstance
对象:
def func_archive_s3_file(**context):
archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=obj.func_archive_s3_file,
provide_context=True,
【讨论】:
愚蠢的问题,但如果我通过xcom_pull
使用最后一个示例,它会重新运行所述任务吗?我假设 xcoms 从一个任务传递到另一个任务(按顺序)。在我的示例中,我需要从第一个任务中给出的文件名。
不,它不会重新运行任务。 XCom 推/拉只是根据 DAG id、执行日期、任务 id 和密钥从气流数据库中的 xcom 表中添加/检索一行。像您已经拥有的那样声明 submit_file_to_spark >> task_archive_s3_file
的依赖项应该足以确保文件名在检索之前被推送到 xcom。
太糟糕了,你只能通过这种方式传递字符串。我想传递对象。
@Julio 从技术上讲,您可以通过 XCom 发送腌制对象,但出于安全考虑,它将在 Airflow 2.0 中被弃用,有关更多详细信息,请参阅相关的 enable_xcom_pickling
配置。
@tatlar 你能看看类似的问题吗:***.com/questions/67631581/…【参考方案2】:
对问题和答案都投了赞成票,但我认为对于那些只想在其 DAG 中的 PythonOperator
任务之间传递小数据对象的用户来说,这可以更清楚一点。参考这个问题和this XCom example 让我得到了以下解决方案。超级简单:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='example_dag',
start_date=datetime.now(),
schedule_interval='@once'
)
def push_function(**kwargs):
ls = ['a', 'b', 'c']
return ls
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
ls = ti.xcom_pull(task_ids='push_task')
print(ls)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
我不确定为什么会这样,但确实如此。向社区提出的几个问题:
ti
在这里发生了什么? **kwargs
是如何内置的?
这两个功能都需要provide_context=True
吗?
非常欢迎任何使这个答案更清晰的编辑!
【讨论】:
这是可行的,因为任何返回值的任务都存储在 xcom (source) 中。所以PythonOperator
的任何返回值都会保存到 XCom(需要小心!)。使用引用的**kwargs
需要提供上下文,我通常将其命名为**context
。此上下文与您在 jinja 模板 (source) 中获得的上下文相同。上下文提供了许多特定于 DAG 运行的有用信息。
@DanielHuang 很棒,很好的解释。感谢您的帮助!
@DanielHuang 再问一个问题:provide_context
是推拉都需要,还是只推?
两者都有!要推送或拉取,您需要访问当前运行的TaskInstance
对象,该对象只能通过context
获得。
请添加此导入语句 from datetime import datetime【参考方案3】:
使用相同的代码并修改参数,如Startdate
等。
import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
args =
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
DAG = DAG(
dag_id='simple_xcom',
default_args=args,
# start_date=datetime(2019, 04, 21),
schedule_interval="@daily",
#schedule_interval=timedelta(1)
)
def push_function(**context):
msg='the_message'
print("message to push: '%s'" % msg)
task_instance = context['task_instance']
task_instance.xcom_push(key="the_message", value=msg)
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
msg = ti.xcom_pull(task_ids='push_task',key='the_message')
print("received message: '%s'" % msg)
pull_task = PythonOperator(`enter code here`
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
如果你想知道context['task_instance']
和kwargs['ti']
是从哪里来的,可以参考Airflow macro documentation
【讨论】:
ti
和 'task_instance' 的名称从何而来?
@LiuWeibo 查看 Airflow 宏:airflow.apache.org/code.html#macros
气流宏链接更新:airflow.apache.org/docs/stable/macros-ref.html
@Kiwy 你可以看看类似的问题***.com/questions/67631581/…
ti 和 task_instance 都是一样的以上是关于气流 - 如何将 xcom 变量传递给 Python 函数的主要内容,如果未能解决你的问题,请参考以下文章