具有 SSH 连接的 Airflow DAG 无法按计划同时开始运行
Posted
技术标签:
【中文标题】具有 SSH 连接的 Airflow DAG 无法按计划同时开始运行【英文标题】:Airflow DAGs with SSH connection can't start running in the same time by schedule 【发布时间】:2021-01-12 10:28:54 【问题描述】:我在使用 Ubuntu 运行 Win 10 的本地计算机上设置了 Airflow 2.0。我使用 PostgreSQL 作为数据库,使用 CeleryExecutor 和 RabbitMQ 作为 Celery 后端。我创建了一些 DAG,每个 DAG 通过 SSH 隧道连接到 Redshift 数据库并执行 SQL 命令。当我手动触发或通过调度程序运行时,每个 DAG 运行顺利。
但是,当我为这些 DAG 同时开始运行设置计划时遇到错误。例如,如果 DAG1 和 DAG2 在上午 8:00 开始运行,这 2 个 dag 将失败并显示以下错误:
psycopg2.OperationalError: 服务器意外关闭连接 这可能意味着服务器异常终止 在处理请求之前或期间。
如果我将这 2 个 dag 设置在不同的时间开始,一切都会顺利进行。另外,如果我将 2 个 dag 组合成 1 个 dag 和 2 个任务,这个组合的 dag 运行良好。
这是我的 DAG 代码,每个 dag 都相同(只是 SQL 命令不同):
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
import time
dag = DAG('test', description='Simple test tutorial DAG',
schedule_interval= None,
start_date=datetime(2021, 1, 6), tags = ['test'])
def select_from_tunnel_db():
# Open SSH tunnel
ssh_hook = SSHHook(ssh_conn_id='dw_ssh')
tunnel = ssh_hook.get_tunnel(remote_port = 5439, remote_host='**.**.**.**', local_port=5439)
tunnel.start()
# Connect to DB and run query
pg_hook = PostgresHook(
postgres_conn_id='dw', # NOTE: host='localhost'
schema='test'
)
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('''
insert into abc values (1, 'a')
''')
cursor.close()
conn.commit()
conn.close()
python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
)
【问题讨论】:
【参考方案1】:我找到了解决方案,所以我回来更新它。希望它有用。
隧道有一个超时间隔(我不知道确切的默认值,但很确定它小于 1 秒),所以我们需要将它设置得更大。创建隧道后再添加 1 行代码:
sshtunnel.SSH_TIMEOUT = sshtunnel.TUNNEL_TIMEOUT = 5.0
【讨论】:
以上是关于具有 SSH 连接的 Airflow DAG 无法按计划同时开始运行的主要内容,如果未能解决你的问题,请参考以下文章
如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?