如何在 Airflow 中将 DatabaseHook 对象与 PythonOperator 一起使用而不会耗尽连接?

Posted

技术标签:

【中文标题】如何在 Airflow 中将 DatabaseHook 对象与 PythonOperator 一起使用而不会耗尽连接?【英文标题】:How to use DatabaseHook objects with PythonOperator in Airflow without running out of connections? 【发布时间】:2020-06-09 17:20:32 【问题描述】:

我正在尝试使用 Airflow Connections 存储我的数据库凭据并将它们与 PythonOperators 一起使用。我注意到,如果我将凭据传递给 PythonOperator,那么每个变量都会被记录下来,包括数据库密码。因此,根据下面的示例,我将连接对象本身传递给 PythonOperator。

但我现在遇到的问题是气流会产生大量此类对象,即使此 dag 仅计划每天运行,导致经常出现达到连接限制的问题。 如何在不使用大量连接的情况下将 PostgresHook 与 PythonOperator 一起用于 Airflow 中的数据脚本?

import sys
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook

try:
    sys.path.append('/path/to/my/awesome/module/')
    from awesome_module import function_1, function_1
except:
    raise ImportError("Couldn't import awesome_module")

postgres_hook_object = PostgresHook("dedicated_bot_account")


with postgres_hook_object.get_conn() as con:
    t1 = PythonOperator(
            task_id = 'function_1',
            python_callable = function_1, 
            dag = dag,
            op_kwargs = 'conn':con
            )

    t2 = PythonOperator(
            task_id = 'function_2',
            python_callable = function_2,
            dag = dag,
            op_args = [con, service]
            )

【问题讨论】:

【参考方案1】:

从 Airflow Slack 我了解到 DAG 中的代码以调度程序的频率运行,因此每次调度程序刷新 DAG 时都会打开多个连接。

似乎最佳做法是通过以下任一方式确保仅在任务运行时打开连接:

    如果任务是在 DAG 中定义的,请将连接打开代码移至 Python 函数定义中 如果任务在别处定义,则在任务中打开连接。 请注意,如果通过纯文本作为变量传递连接信息,那么这将被记录

【讨论】:

以上是关于如何在 Airflow 中将 DatabaseHook 对象与 PythonOperator 一起使用而不会耗尽连接?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Airflow:将s3复制到s3的运算符

如何在 Windows 上运行 Airflow

如何在 Airflow 中运行 Spark 代码?

如何向 Airflow 添加新的 DAG?

如何在 Airflow 中创建条件任务

如何在 Airflow 中设置 DAG 之间的依赖关系?