具有 SSH 连接的 Airflow DAG 无法按计划同时开始运行

Posted

技术标签:

【中文标题】具有 SSH 连接的 Airflow DAG 无法按计划同时开始运行【英文标题】:Airflow DAGs with SSH connection can't start running in the same time by schedule 【发布时间】:2021-01-12 10:28:54 【问题描述】:

我在使用 Ubuntu 运行 Win 10 的本地计算机上设置了 Airflow 2.0。我使用 PostgreSQL 作为数据库,使用 CeleryExecutor 和 RabbitMQ 作为 Celery 后端。我创建了一些 DAG,每个 DAG 通过 SSH 隧道连接到 Redshift 数据库并执行 SQL 命令。当我手动触发或通过调度程序运行时,每个 DAG 运行顺利。

但是,当我为这些 DAG 同时开始运行设置计划时遇到错误。例如,如果 DAG1 和 DAG2 在上午 8:00 开始运行,这 2 个 dag 将失败并显示以下错误:

psycopg2.OperationalError: 服务器意外关闭连接 这可能意味着服务器异常终止 在处理请求之前或期间。

如果我将这 2 个 dag 设置在不同的时间开始,一切都会顺利进行。另外,如果我将 2 个 dag 组合成 1 个 dag 和 2 个任务,这个组合的 dag 运行良好。

这是我的 DAG 代码,每个 dag 都相同(只是 SQL 命令不同):

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
import time

dag = DAG('test', description='Simple test tutorial DAG',
          schedule_interval= None,
          start_date=datetime(2021, 1, 6), tags = ['test'])

def select_from_tunnel_db():
    # Open SSH tunnel
    ssh_hook = SSHHook(ssh_conn_id='dw_ssh')
    tunnel = ssh_hook.get_tunnel(remote_port = 5439, remote_host='**.**.**.**', local_port=5439)
    tunnel.start()

    # Connect to DB and run query
    pg_hook = PostgresHook(
    postgres_conn_id='dw', # NOTE: host='localhost'
    schema='test'
    )
    conn = pg_hook.get_conn()
    cursor = conn.cursor()
    cursor.execute('''
    insert into abc values (1, 'a')
    ''')
    cursor.close()
    conn.commit()
    conn.close()


python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
)

【问题讨论】:

【参考方案1】:

我找到了解决方案,所以我回来更新它。希望它有用。

隧道有一个超时间隔(我不知道确切的默认值,但很确定它小于 1 秒),所以我们需要将它设置得更大。创建隧道后再添加 1 行代码:

sshtunnel.SSH_TIMEOUT = sshtunnel.TUNNEL_TIMEOUT = 5.0

【讨论】:

以上是关于具有 SSH 连接的 Airflow DAG 无法按计划同时开始运行的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?

AirFlow 管理界面使用

Airflow dag 中的 postgres_operator 问题

由于执行日期和开始日期,无法运行 Airflow 任务

如何向 Airflow 添加新的 DAG?

在 DAG 运行期间动态生成 DAG - Airflow