使用 Airflow 进行集成测试

Posted

技术标签:

【中文标题】使用 Airflow 进行集成测试【英文标题】:Integration tests with Airflow 【发布时间】:2018-04-26 21:25:30 【问题描述】:

针对 BigQuery 运行 Airflow 集成测试的好方法/推荐方法是什么?

Airflow 似乎有不少专为测试而设计的运算符,例如BigQueryCheckOperator。我正在努力寻找任何可以展示如何将其用于集成测试的最佳实践或示例。

推荐的集成测试方法是:

具体任务 整个 DAG

最好使用用 Python 编写的测试框架,因为 Airflow 也是用 Python 编写的,例如 pytest 或类似的。

【问题讨论】:

您可以查看 dsunit (github.com/viant/dsunit) 的各种验证选项, 免责声明:我一直在为 dsunit 做出贡献 @Datageek 你想出了什么解决方案?我目前正在通过从测试用例中调用气流 CLI 测试 cmd (airflow.apache.org/cli.html#test) 来测试我的 dags 和任务。可能是一种相当缓慢的方法,但它确实有效。 我链接了一种有趣的方法,但是在我看来,由于使用了模拟,这比集成方法更单元化,但仍然可能会有所帮助:blog.godatadriven.com/testing-and-debugging-apache-airflow @andilabs 我们最终为每个生产 DAG 实施了一个测试 DAG。它将 DAG 本身作为 subDAG(在测试数据上)运行,然后运行测试任务,比较实际数据集和预期数据集之间的结果。 【参考方案1】:

你可以像这样在整个 dag 上运行测试:

import unittest
from airflow.models import DagBag

class TestDagIntegrity(unittest.TestCase):

LOAD_SECOND_THRESHOLD = 2

def setUp(self):
    self.dagbag = DagBag()

def test_import_dags(self):
    """ Test if dags works, no fail in import
    """
    self.assertFalse(
        len(self.dagbag.import_errors),
        'DAG import failures. Errors: '.format(
            self.dagbag.import_errors
        )
    )

suite = unittest.TestLoader().loadTestsFromTestCase(TestDagIntegrity)
unittest.TextTestRunner(verbosity=2).run(suite)

如果你的 dags 工作,如果没有导入失败或缺少变量,它将被测试

【讨论】:

那么,有没有办法模拟配置(run_dag.conf、hook.get_connection 和变量)并使用实际数据运行 dag,而不仅仅是导入它?

以上是关于使用 Airflow 进行集成测试的主要内容,如果未能解决你的问题,请参考以下文章

使用Mybatis和不同DB进行集成测试开发时如何做集成测试

集成测试

使用 Flyway 进行集成测试

airflow集成EMR使用

使用 Jooq 进行集成测试

使用 Spring Boot 进行 Kotlin 集成测试