测试气流:具有 DAG 的任务和任务上下文未在 pytest 中运行

Posted

技术标签:

【中文标题】测试气流:具有 DAG 的任务和任务上下文未在 pytest 中运行【英文标题】:Testing Airflow: task with DAG and task context not running in pytest 【发布时间】:2021-02-17 17:19:55 【问题描述】:

我正在使用 pytest 测试需要任务上下文(因此需要 DAG)的 Airflow 任务。 有两个文件:

    conftest.py:包含 pytext 固定装置的文件 test_bash.py:包含任务测试的文件

这是 DAG 的夹具:

@pytest.fixture
def test_dag():
    args = 'owner': 'airflow', 'start_date': datetime.datetime(2021, 1, 1)
    return DAG('test_dag', default_args=args, schedule_interval='@daily')

这是我用于 Airflow 元存储的 postgres_container 的固定装置:

postgres_image = fetch(repository='postgres:13')
postgres_container = container(
    image='postgres_image.id',
    scope='function',
    ports='5432/tcp': None,
    environment=
        'POSTGRES_USER': 'airflow',
        'POSTGRES_PASSWORD': 'airflow',
    

这是实际测试:

import os
import ipaddress
import subprocess

from airflow.operators.bash import BashOperator


def test_bash_so(test_dag, postgres_container):
    print(
       f"Running PostgreSQL container named postgres_container.name "
       f"on port postgres_container.ports['5432/tcp'][0]."
    )
    postgres_ip = ipaddress.ip_address(postgres_container.ips.primary)

    # set sql_alchemy_conn to the URI of the postgres_container
    env = os.environ.copy()
    env['AIRFLOW__CORE__SQL_ALCHEMY_CONN'] = (
        f'postgresql+psycopg2://airflow:airflow@postgres_ip:5432/airflow')

    # initialize the Airflow metastore
    p = subprocess.run(['airflow', 'db', 'init'], capture_output=True, env=env)
    p.check_returncode()
    print(p.stdout)

    # check that the Airflow metastore was initialized successfuly
    p = subprocess.run(['airflow', 'db', 'check'], capture_output=True, env=env)
    p.check_returncode()
    print(p.stdout)

    my_file = '/tmp/hello.txt'

    task = BashOperator(
        task_id='test_bash',
        bash_command=f'echo hello > my_file',
        dag=test_dag
    )

    task.run(
        start_date=test_dag.default_args['start_date'],
        end_date=test_dag.default_args['start_date']
    )

    with open(my_file, 'r') as f:
        assert f.read().replace('\n', '') == 'hello'

我认为该任务从未真正运行,因为没有创建文件。

这是我运行pytest -rP时的输出:

=============================================================================================================== FAILURES ===============================================================================================================
_____________________________________________________________________________________________________________ test_bash_so _____________________________________________________________________________________________________________

test_dag = <DAG: test_dag>, postgres_container = <pytest_docker_tools.wrappers.container.Container object at 0x7f085c8a1160>

    def test_bash_so(test_dag, postgres_container):
        print(
           f"Running PostgreSQL container named postgres_container.name "
           f"on port postgres_container.ports['5432/tcp'][0]."
        )
        postgres_ip = ipaddress.ip_address(postgres_container.ips.primary)

        env = os.environ.copy()
        env['AIRFLOW__CORE__SQL_ALCHEMY_CONN'] = (
            f'postgresql+psycopg2://airflow:airflow@postgres_ip:5432/airflow')

        p = subprocess.run(['airflow', 'db', 'init'], capture_output=True, env=env)
        p.check_returncode()
        print(p.stdout)

        p = subprocess.run(['airflow', 'db', 'check'], capture_output=True, env=env)
        p.check_returncode()
        print(p.stdout)

        my_file = '/tmp/hello.txt'

        task = BashOperator(
            task_id='test_bash',
            bash_command=f'echo hello > my_file',
            dag=test_dag
        )

        task.run(
            start_date=test_dag.default_args['start_date'],
            end_date=test_dag.default_args['start_date']
        )

>       with open(my_file, 'r') as f:
E       FileNotFoundError: [Errno 2] No such file or directory: '/tmp/hello.txt'

tests/plugins/custom_operators/test_bash_so.py:40: FileNotFoundError
-------------------------------------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------------------------------------
Fetching postgres:13
Waiting for container to be ready..
--------------------------------------------------------------------------------------------------------- Captured stdout call ---------------------------------------------------------------------------------------------------------
Running PostgreSQL container named vigilant_austin on port 49237.
b"DB: postgresql+psycopg2://airflow:***@172.17.0.2:5432/airflow\n[2021-02-17 18:07:21,798] db.py:678 INFO - Creating tables\n[2021-02-17 18:07:23,478] manager.py:727 WARNING - No user yet created, use flask fab command to do it.\n[2021-02-17 18:07:25,526] migration.py:555 INFO - Running upgrade 2c6edca13270 -> 61ec73d9401f, Add description field to connection\n[2021-02-17 18:07:25,528] migration.py:555 INFO - Running upgrade 61ec73d9401f -> 64a7d6477aae, fix description field in connection to be text\n[2021-02-17 18:07:25,529] migration.py:555 INFO - Running upgrade 64a7d6477aae -> e959f08ac86c, Change field in DagCode to MEDIUMTEXT for mysql\n[2021-02-17 18:07:25,660] dagbag.py:440 INFO - Filling up the DagBag from /home/denis/airflow/dags\n[2021-02-17 18:07:25,692] example_kubernetes_executor_config.py:174 WARNING - Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'\n[2021-02-17 18:07:25,692] example_kubernetes_executor_config.py:175 WARNING - Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']\n[2021-02-17 18:07:25,713] dag.py:1813 INFO - Sync 28 DAGs\n[2021-02-17 18:07:25,717] dag.py:1832 INFO - Creating ORM DAG for example_xcom_args_with_operators\n[2021-02-17 18:07:25,717] dag.py:1832 INFO - Creating ORM DAG for tutorial\n[2021-02-17 18:07:25,718] dag.py:1832 INFO - Creating ORM DAG for latest_only\n[2021-02-17 18:07:25,718] dag.py:1832 INFO - Creating ORM DAG for example_trigger_target_dag\n[2021-02-17 18:07:25,718] dag.py:1832 INFO - Creating ORM DAG for example_subdag_operator.section-1\n[2021-02-17 18:07:25,718] dag.py:1832 INFO - Creating ORM DAG for example_trigger_controller_dag\n[2021-02-17 18:07:25,718] dag.py:1832 INFO - Creating ORM DAG for example_passing_params_via_test_command\n[2021-02-17 18:07:25,719] dag.py:1832 INFO - Creating ORM DAG for example_xcom\n[2021-02-17 18:07:25,719] dag.py:1832 INFO - Creating ORM DAG for latest_only_with_trigger\n[2021-02-17 18:07:25,719] dag.py:1832 INFO - Creating ORM DAG for example_kubernetes_executor\n[2021-02-17 18:07:25,719] dag.py:1832 INFO - Creating ORM DAG for example_subdag_operator\n[2021-02-17 18:07:25,719] dag.py:1832 INFO - Creating ORM DAG for example_subdag_operator.section-2\n[2021-02-17 18:07:25,719] dag.py:1832 INFO - Creating ORM DAG for tutorial_etl_dag\n[2021-02-17 18:07:25,720] dag.py:1832 INFO - Creating ORM DAG for example_short_circuit_operator\n[2021-02-17 18:07:25,720] dag.py:1832 INFO - Creating ORM DAG for example_python_operator\n[2021-02-17 18:07:25,720] dag.py:1832 INFO - Creating ORM DAG for example_complex\n[2021-02-17 18:07:25,720] dag.py:1832 INFO - Creating ORM DAG for example_nested_branch_dag\n[2021-02-17 18:07:25,720] dag.py:1832 INFO - Creating ORM DAG for example_dag_decorator\n[2021-02-17 18:07:25,721] dag.py:1832 INFO - Creating ORM DAG for example_external_task_marker_child\n[2021-02-17 18:07:25,721] dag.py:1832 INFO - Creating ORM DAG for example_external_task_marker_parent\n[2021-02-17 18:07:25,721] dag.py:1832 INFO - Creating ORM DAG for example_task_group\n[2021-02-17 18:07:25,721] dag.py:1832 INFO - Creating ORM DAG for example_branch_dop_operator_v3\n[2021-02-17 18:07:25,721] dag.py:1832 INFO - Creating ORM DAG for example_branch_operator\n[2021-02-17 18:07:25,722] dag.py:1832 INFO - Creating ORM DAG for example_xcom_args\n[2021-02-17 18:07:25,722] dag.py:1832 INFO - Creating ORM DAG for example_bash_operator\n[2021-02-17 18:07:25,722] dag.py:1832 INFO - Creating ORM DAG for example_skip_dag\n[2021-02-17 18:07:25,722] dag.py:1832 INFO - Creating ORM DAG for tutorial_taskflow_api_etl\n[2021-02-17 18:07:25,722] dag.py:1832 INFO - Creating ORM DAG for test_utils\n[2021-02-17 18:07:25,730] dag.py:2266 INFO - Setting next_dagrun for example_bash_operator to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,741] dag.py:2266 INFO - Setting next_dagrun for example_branch_dop_operator_v3 to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,746] dag.py:2266 INFO - Setting next_dagrun for example_branch_operator to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,746] dag.py:2266 INFO - Setting next_dagrun for example_complex to None\n[2021-02-17 18:07:25,747] dag.py:2266 INFO - Setting next_dagrun for example_dag_decorator to None\n[2021-02-17 18:07:25,747] dag.py:2266 INFO - Setting next_dagrun for example_external_task_marker_child to None\n[2021-02-17 18:07:25,747] dag.py:2266 INFO - Setting next_dagrun for example_external_task_marker_parent to None\n[2021-02-17 18:07:25,747] dag.py:2266 INFO - Setting next_dagrun for example_kubernetes_executor to None\n[2021-02-17 18:07:25,752] dag.py:2266 INFO - Setting next_dagrun for example_nested_branch_dag to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,763] dag.py:2266 INFO - Setting next_dagrun for example_passing_params_via_test_command to 2021-02-16 00:00:00+00:00\n[2021-02-17 18:07:25,763] dag.py:2266 INFO - Setting next_dagrun for example_python_operator to None\n[2021-02-17 18:07:25,764] dag.py:2266 INFO - Setting next_dagrun for example_short_circuit_operator to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,764] dag.py:2266 INFO - Setting next_dagrun for example_skip_dag to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,764] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,764] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator.section-1 to None\n[2021-02-17 18:07:25,764] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator.section-2 to None\n[2021-02-17 18:07:25,764] dag.py:2266 INFO - Setting next_dagrun for example_task_group to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,765] dag.py:2266 INFO - Setting next_dagrun for example_trigger_controller_dag to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,765] dag.py:2266 INFO - Setting next_dagrun for example_trigger_target_dag to None\n[2021-02-17 18:07:25,765] dag.py:2266 INFO - Setting next_dagrun for example_xcom to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,765] dag.py:2266 INFO - Setting next_dagrun for example_xcom_args to None\n[2021-02-17 18:07:25,765] dag.py:2266 INFO - Setting next_dagrun for example_xcom_args_with_operators to None\n[2021-02-17 18:07:25,766] dag.py:2266 INFO - Setting next_dagrun for latest_only to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,766] dag.py:2266 INFO - Setting next_dagrun for latest_only_with_trigger to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,766] dag.py:2266 INFO - Setting next_dagrun for test_utils to None\n[2021-02-17 18:07:25,766] dag.py:2266 INFO - Setting next_dagrun for tutorial to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,767] dag.py:2266 INFO - Setting next_dagrun for tutorial_etl_dag to None\n[2021-02-17 18:07:25,767] dag.py:2266 INFO - Setting next_dagrun for tutorial_taskflow_api_etl to None\n[2021-02-17 18:07:25,792] dag.py:1813 INFO - Sync 2 DAGs\n[2021-02-17 18:07:25,799] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator.section-1 to None\n[2021-02-17 18:07:25,800] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator.section-2 to None\nInitialization done\n"
b'[2021-02-17 18:07:26,759] db.py:756 INFO - Connection successful.\n'
------------------------------------------------------------------------------------------------- postgres_container: vigilant_austin --------------------------------------------------------------------------------------------------
The files belonging to this database system will be owned by user "postgres".
This user must also own the server process.

The database cluster will be initialized with locale "en_US.utf8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".

Data page checksums are disabled.

fixing permissions on existing directory /var/lib/postgresql/data ... ok
creating subdirectories ... ok
selecting dynamic shared memory implementation ... posix
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting default time zone ... Etc/UTC
creating configuration files ... ok
running bootstrap script ... ok
performing post-bootstrap initialization ... ok
syncing data to disk ... ok


Success. You can now start the database server using:

    pg_ctl -D /var/lib/postgresql/data -l logfile start

initdb: warning: enabling "trust" authentication for local connections
You can change this by editing pg_hba.conf or using the option -A, or
--auth-local and --auth-host, the next time you run initdb.
waiting for server to start....2021-02-17 17:07:20.664 UTC [53] LOG:  starting PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
2021-02-17 17:07:20.665 UTC [53] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2021-02-17 17:07:20.667 UTC [54] LOG:  database system was shut down at 2021-02-17 17:07:20 UTC
2021-02-17 17:07:20.671 UTC [53] LOG:  database system is ready to accept connections
 done
server started
CREATE DATABASE


/usr/local/bin/docker-entrypoint.sh: ignoring /docker-entrypoint-initdb.d/*

2021-02-17 17:07:20.898 UTC [53] LOG:  received fast shutdown request
waiting for server to shut down....2021-02-17 17:07:20.899 UTC [53] LOG:  aborting any active transactions
2021-02-17 17:07:20.900 UTC [53] LOG:  background worker "logical replication launcher" (PID 60) exited with exit code 1
2021-02-17 17:07:20.900 UTC [55] LOG:  shutting down
2021-02-17 17:07:20.908 UTC [53] LOG:  database system is shut down
 done
server stopped

PostgreSQL init process complete; ready for start up.

2021-02-17 17:07:21.019 UTC [1] LOG:  starting PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
2021-02-17 17:07:21.019 UTC [1] LOG:  listening on IPv4 address "0.0.0.0", port 5432
2021-02-17 17:07:21.019 UTC [1] LOG:  listening on IPv6 address "::", port 5432
2021-02-17 17:07:21.020 UTC [1] LOG:  listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2021-02-17 17:07:21.024 UTC [81] LOG:  database system was shut down at 2021-02-17 17:07:20 UTC
2021-02-17 17:07:21.027 UTC [1] LOG:  database system is ready to accept connections
2021-02-17 17:07:21.829 UTC [94] ERROR:  relation "connection" does not exist at character 569
2021-02-17 17:07:21.829 UTC [94] STATEMENT:  SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted, count(connection.conn_id) AS count_1
        FROM connection GROUP BY connection.conn_id
        HAVING count(connection.conn_id) > 1
2021-02-17 17:07:21.830 UTC [94] ERROR:  current transaction is aborted, commands ignored until end of transaction block
2021-02-17 17:07:21.830 UTC [94] STATEMENT:  SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted
        FROM connection
        WHERE connection.conn_type IS NULL
================================================================================================================ PASSES ================================================================================================================
=============================================================================================== 1 failed, 2 passed, 3 skipped in 11.83s ================================================================================================

有趣的是,如果我注释掉实际的 task.run() 调用和其余部分,Airflow 元存储似乎已正确初始化:

______________________________________________________________________________________________________________ test_bash_so _______________________________________________________________________________________________________________
-------------------------------------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------------------------------------
Fetching postgres:13
Waiting for container to be ready..
--------------------------------------------------------------------------------------------------------- Captured stdout call ---------------------------------------------------------------------------------------------------------
Running PostgreSQL container named tender_rubin on port 49234.
DB: postgresql+psycopg2://airflow:***@172.17.0.2:5432/airflow
[2021-02-17 17:52:31,680] db.py:678 INFO - Creating tables
[2021-02-17 17:52:33,363] manager.py:727 WARNING - No user yet created, use flask fab command to do it.
[2021-02-17 17:52:35,409] migration.py:555 INFO - Running upgrade 2c6edca13270 -> 61ec73d9401f, Add description field to connection
[2021-02-17 17:52:35,411] migration.py:555 INFO - Running upgrade 61ec73d9401f -> 64a7d6477aae, fix description field in connection to be text
[2021-02-17 17:52:35,413] migration.py:555 INFO - Running upgrade 64a7d6477aae -> e959f08ac86c, Change field in DagCode to MEDIUMTEXT for MySql
[2021-02-17 17:52:35,547] dagbag.py:440 INFO - Filling up the DagBag from /home/denis/airflow/dags
[2021-02-17 17:52:35,570] example_kubernetes_executor_config.py:174 WARNING - Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
[2021-02-17 17:52:35,570] example_kubernetes_executor_config.py:175 WARNING - Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']
[2021-02-17 17:52:35,592] dag.py:1813 INFO - Sync 28 DAGs
[2021-02-17 17:52:35,596] dag.py:1832 INFO - Creating ORM DAG for example_branch_operator
[2021-02-17 17:52:35,596] dag.py:1832 INFO - Creating ORM DAG for example_external_task_marker_child
[2021-02-17 17:52:35,596] dag.py:1832 INFO - Creating ORM DAG for example_trigger_controller_dag
[2021-02-17 17:52:35,597] dag.py:1832 INFO - Creating ORM DAG for example_short_circuit_operator
[2021-02-17 17:52:35,597] dag.py:1832 INFO - Creating ORM DAG for example_branch_dop_operator_v3
[2021-02-17 17:52:35,597] dag.py:1832 INFO - Creating ORM DAG for example_passing_params_via_test_command
[2021-02-17 17:52:35,597] dag.py:1832 INFO - Creating ORM DAG for latest_only_with_trigger
[2021-02-17 17:52:35,597] dag.py:1832 INFO - Creating ORM DAG for tutorial_taskflow_api_etl
[2021-02-17 17:52:35,598] dag.py:1832 INFO - Creating ORM DAG for example_external_task_marker_parent
[2021-02-17 17:52:35,598] dag.py:1832 INFO - Creating ORM DAG for example_bash_operator
[2021-02-17 17:52:35,598] dag.py:1832 INFO - Creating ORM DAG for example_subdag_operator
[2021-02-17 17:52:35,598] dag.py:1832 INFO - Creating ORM DAG for example_python_operator
[2021-02-17 17:52:35,598] dag.py:1832 INFO - Creating ORM DAG for example_dag_decorator
[2021-02-17 17:52:35,599] dag.py:1832 INFO - Creating ORM DAG for example_subdag_operator.section-2
[2021-02-17 17:52:35,599] dag.py:1832 INFO - Creating ORM DAG for tutorial
[2021-02-17 17:52:35,599] dag.py:1832 INFO - Creating ORM DAG for test_utils
[2021-02-17 17:52:35,599] dag.py:1832 INFO - Creating ORM DAG for example_kubernetes_executor
[2021-02-17 17:52:35,599] dag.py:1832 INFO - Creating ORM DAG for latest_only
[2021-02-17 17:52:35,600] dag.py:1832 INFO - Creating ORM DAG for example_xcom
[2021-02-17 17:52:35,600] dag.py:1832 INFO - Creating ORM DAG for example_nested_branch_dag
[2021-02-17 17:52:35,600] dag.py:1832 INFO - Creating ORM DAG for example_trigger_target_dag
[2021-02-17 17:52:35,600] dag.py:1832 INFO - Creating ORM DAG for example_xcom_args_with_operators
[2021-02-17 17:52:35,600] dag.py:1832 INFO - Creating ORM DAG for example_xcom_args
[2021-02-17 17:52:35,601] dag.py:1832 INFO - Creating ORM DAG for example_task_group
[2021-02-17 17:52:35,601] dag.py:1832 INFO - Creating ORM DAG for example_subdag_operator.section-1
[2021-02-17 17:52:35,601] dag.py:1832 INFO - Creating ORM DAG for tutorial_etl_dag
[2021-02-17 17:52:35,601] dag.py:1832 INFO - Creating ORM DAG for example_complex
[2021-02-17 17:52:35,601] dag.py:1832 INFO - Creating ORM DAG for example_skip_dag
[2021-02-17 17:52:35,609] dag.py:2266 INFO - Setting next_dagrun for example_bash_operator to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,621] dag.py:2266 INFO - Setting next_dagrun for example_branch_dop_operator_v3 to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,625] dag.py:2266 INFO - Setting next_dagrun for example_branch_operator to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,625] dag.py:2266 INFO - Setting next_dagrun for example_complex to None
[2021-02-17 17:52:35,626] dag.py:2266 INFO - Setting next_dagrun for example_dag_decorator to None
[2021-02-17 17:52:35,626] dag.py:2266 INFO - Setting next_dagrun for example_external_task_marker_child to None
[2021-02-17 17:52:35,626] dag.py:2266 INFO - Setting next_dagrun for example_external_task_marker_parent to None
[2021-02-17 17:52:35,626] dag.py:2266 INFO - Setting next_dagrun for example_kubernetes_executor to None
[2021-02-17 17:52:35,631] dag.py:2266 INFO - Setting next_dagrun for example_nested_branch_dag to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,642] dag.py:2266 INFO - Setting next_dagrun for example_passing_params_via_test_command to 2021-02-16 00:00:00+00:00
[2021-02-17 17:52:35,642] dag.py:2266 INFO - Setting next_dagrun for example_python_operator to None
[2021-02-17 17:52:35,643] dag.py:2266 INFO - Setting next_dagrun for example_short_circuit_operator to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,643] dag.py:2266 INFO - Setting next_dagrun for example_skip_dag to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,643] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,643] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator.section-1 to None
[2021-02-17 17:52:35,643] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator.section-2 to None
[2021-02-17 17:52:35,644] dag.py:2266 INFO - Setting next_dagrun for example_task_group to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,644] dag.py:2266 INFO - Setting next_dagrun for example_trigger_controller_dag to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,644] dag.py:2266 INFO - Setting next_dagrun for example_trigger_target_dag to None
[2021-02-17 17:52:35,644] dag.py:2266 INFO - Setting next_dagrun for example_xcom to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,644] dag.py:2266 INFO - Setting next_dagrun for example_xcom_args to None
[2021-02-17 17:52:35,645] dag.py:2266 INFO - Setting next_dagrun for example_xcom_args_with_operators to None
[2021-02-17 17:52:35,645] dag.py:2266 INFO - Setting next_dagrun for latest_only to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,645] dag.py:2266 INFO - Setting next_dagrun for latest_only_with_trigger to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,645] dag.py:2266 INFO - Setting next_dagrun for test_utils to None
[2021-02-17 17:52:35,645] dag.py:2266 INFO - Setting next_dagrun for tutorial to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,646] dag.py:2266 INFO - Setting next_dagrun for tutorial_etl_dag to None
[2021-02-17 17:52:35,646] dag.py:2266 INFO - Setting next_dagrun for tutorial_taskflow_api_etl to None
[2021-02-17 17:52:35,671] dag.py:1813 INFO - Sync 2 DAGs
[2021-02-17 17:52:35,677] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator.section-1 to None
[2021-02-17 17:52:35,677] dag.py:2266 INFO - Setting next_dagrun for example_subdag_operator.section-2 to None
Initialization done

[2021-02-17 17:52:36,652] db.py:756 INFO - Connection successful.

所以我想知道:subprocess.run() 调用之后的 task.run() 代码是否会以某种方式影响 Metastore 初始化?也许任务在元存储初始化之前运行?或者可能由于其他原因该任务根本没有运行?

【问题讨论】:

【参考方案1】:

所以问题出现是因为 Airflow 任务在元存储初始化之前启动。强制任务等到 postgres 容器启动并且 airflow db init 调用成功后,这很复杂。

相反,我只是使用在测试结束时删除的 sqlite 元存储:

import os
import subprocess

from airflow.operators.bash import BashOperator
from sqlalchemy.orm.exc import NoResultFound
from pathlib import Path


def test_bash(tmp_path: Path, test_dag, monkeypatch):
    my_file = tmp_path / 'hello.txt'

    metastore_path = os.path.join(os.path.dirname(__file__), 'airflow.db')
    uri = f'sqlite:///metastore_path'

    monkeypatch.setenv('AIRFLOW__CORE__SQL_ALCHEMY_CONN', uri)
    monkeypatch.setenv('AIRFLOW__CORE__LOAD_EXAMPLES', 'False')
    monkeypatch.setenv('AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS', 'False')

    init_metastore = subprocess.Popen(['airflow', 'db', 'init'])
    init_metastore.wait()

    task = BashOperator(
        task_id='test_id',
        bash_command=f'echo hello > my_file',
        dag=test_dag
    )

    try:
        print('Running task...')
        task.run(
            start_date=test_dag.default_args['start_date'],
            end_date=test_dag.default_args['start_date']
        )
    except NoResultFound:
        print('Caught expected `sqlalchemy.orm.exc.NoResultFound` exception')
    finally:
        os.remove(metastore_path)

    with open(my_file, 'r') as f:
        assert f.read().replace('\n', '') == 'hello'

我必须使用新的task_id 捕获每次首次运行任务时发生的qlalchemy.orm.exc.NoResultFound 异常。目前尚不清楚为什么会因为任务成功而引发此异常。

【讨论】:

以上是关于测试气流:具有 DAG 的任务和任务上下文未在 pytest 中运行的主要内容,如果未能解决你的问题,请参考以下文章

气流:任务调度间隔数秒,两次

气流未在 /usr/local/airflow/dags 中加载 dag

放入并行管道时未在 DAG 中显示的任务

apache气流的sql查询

气流动态生成的任务未按顺序运行

如何防止气流回填 dag 运行?