气流测试模式 xcom 拉/推不工作

Posted

技术标签:

【中文标题】气流测试模式 xcom 拉/推不工作【英文标题】:airflow test mode xcom pull/push not working 【发布时间】:2019-11-15 10:36:50 【问题描述】:

我尝试通过气流 cli test 命令测试 2 个任务`

第一个任务运行,自动将最后一个控制台推送到 xcom,我在气流 GUI 中看到了值 some value,正如预期的那样

当我通过气流 cli 测试命令运行第二个任务时,我只是得到 None 作为返回值,但正如我在这里读到的:How to test Apache Airflow tasks that uses XCom 它应该可以工作,至少 xcom_push 显然可以工作,为什么 xcom_pull 不可以?

有人提示如何使其正常工作? 提供上下文设置为 true。

示例代码:

t1 = BashOperator(
    task_id='t1',
    bash_command='echo "some value"',
    xcom_push=True,
    dag=dag
)

t2 = BashOperator(
    task_id='t2',
    bash_command='echo  ti.xcom_pull(task_ids="t1") ',
    xcom_push=True,
    dag=dag
)

谢谢!

编辑:当我在没有测试模式的情况下运行代码(DAG)时,xcom_pull 工作正常

【问题讨论】:

【参考方案1】:

据我所知,“测试”运行时不会将任何内容保存到元数据数据库中,这就是为什么当您运行 puller 任务时,您会得到“无”结果,而当您实际运行 DAG 代码时,它可以工作。

您可以在测试第一个任务后直接查询元数据数据库以验证这一点。

【讨论】:

这就是问题所在。我会理解什么时候 testmode 从不写入元数据数据库或总是写入。但在我的情况下,它写入元数据库,但随后无法从元数据库中读取写入的值,这有点奇怪。【参考方案2】:

这里好像缺少上下文,连同xcom_push=True,我们需要使用provide_context=True

【讨论】:

以上是关于气流测试模式 xcom 拉/推不工作的主要内容,如果未能解决你的问题,请参考以下文章

气流 - 如何将 xcom 变量传递给 Python 函数

运营商之间的气流和数据传输

气流网络服务器启动 - gunicorn 工作人员正在关闭

使用气流进行实时工作编排

如何显示DockerOperator的日志输出?

如何将数据帧传递到气流任务的临时表中