气流 - Pytest - 未找到夹具“dag”

Posted

技术标签:

【中文标题】气流 - Pytest - 未找到夹具“dag”【英文标题】:Airflow - Pytest - fixture 'dag' not found 【发布时间】:2021-10-01 15:26:45 【问题描述】:

我正在尝试为气流中的 DAG 编写一些完整性测试。我目前正在使用airflow.utils.dag_cycle_tester 中的test_cycle 函数测试循环DAG。我的代码如下:

import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
from airflow.utils.dag_cycle_tester import test_cycle

DAG_PATH = os.path.join(
 os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)

print(DAG_FILES)

@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
    module_name, _ = os.path.splitext(dag_file)
    module_path = os.path.join(DAG_PATH, dag_file)
    mod_spec = importlib.util.spec_from_file_location(module_name, 
    module_path)
    module = importlib.util.module_from_spec(mod_spec)
    mod_spec.loader.exec_module(module)
    dag_objects = [var for var in vars(module).values() if isinstance(var, 
    DAG)]
    assert dag_objects
    for dag in dag_objects:
        test_cycle(dag)

由于某种原因,当我使用 pytest tests/ 运行它时,我收到以下错误:

___________在设置 test_cycle 时出错 ___________

def test_cycle(dag):

找不到 E 夹具“dag”

可用的夹具:anyio_backend、anyio_backend_name、anyio_backend_options、缓存、capfd、capfdbinary、caplog、capsys、capsysbinary、celery_config、celery_enable_logging、cov、doctest_namespace、monkeypatch、no_cover、pytestconfig、record_property、record_testsuite_property、record_xml_attribute、recwarn、ytmp_path、 tmpdir, tmpdir_factory 使用 'pytest --fixtures [testpath]' 获取帮助。

任何关于导致这种情况的想法都将不胜感激。谢谢!

【问题讨论】:

错误发生在另一个功能上,而不是您显示的功能上,显示该功能 【参考方案1】:

虽然失败实际上是在您所显示的另一个函数中,但我可以解释发生了什么。

问题: Pytest 从test_ 开始挑选每个函数。因此,当您的函数 test_cycletest_ 开头时,它也将作为单独的测试运行。在单独的测试中,它会尝试找到不存在的夹具dag

因此解决方法是: 将函数重命名为 cycle_dag 应该可以修复它:)

【讨论】:

谢谢 Jorrick,这就是问题所在 :)

以上是关于气流 - Pytest - 未找到夹具“dag”的主要内容,如果未能解决你的问题,请参考以下文章

气流未在 /usr/local/airflow/dags 中加载 dag

部署气流代码库

标记一个 Pytest 夹具而不是使用该夹具的所有测试

在气流上部署 dag 文件的有效方法

气流 DAG 步骤依赖项

气流:如何删除 DAG?