气流 - Python 文件不在同一个 DAG 文件夹中

Posted

技术标签:

【中文标题】气流 - Python 文件不在同一个 DAG 文件夹中【英文标题】:Airflow - Python file NOT in the same DAG folder 【发布时间】:2016-02-04 06:27:49 【问题描述】:

我正在尝试使用 Airflow 来执行一个简单的任务 python。

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = 
    'owner': 'airflow',
    'start_date': seven_days_ago,


dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

如果我尝试,例如:

气流测试 python_test print 2015-01-01

有效!

现在我想将我的 def print_context(ds, **kwargs) 函数放在其他 python 文件中。所以我创建了另一个名为:simple_test.py 的文件并更改:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)

现在我尝试再次运行:

气流测试 python_test print 2015-01-01

好吧!它仍然有效!

但如果我创建一个模块,例如,带有文件SimplePython.py 的工作模块,导入 (from worker import SimplePython) 并尝试:

气流测试 python_test print 2015-01-01

它给出了信息:

ImportError: 没有名为 worker 的模块

问题:

    是否可以在 DAG 定义中导入模块? Airflow+Celery 将如何在工作节点之间分发所有必要的 python 源文件?

【问题讨论】:

【参考方案1】:

您可以按照以下方式打包 DAG 的依赖项:

https://airflow.apache.org/concepts.html#packaged-dags

为此,您可以在 zip 文件的根目录中创建一个包含 dag 的 zip 文件,并将额外的模块解压缩到目录中。 例如,您可以创建一个如下所示的 zip 文件:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

Airflow 将扫描 zip 文件并尝试加载 my_dag1.py 和 my_dag2.py。它不会进入子目录,因为这些被认为是潜在的包。

使用 CeleryExecutor 时,您需要手动同步 DAG 目录,Airflow 不会为您处理这些:

https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery

worker 需要访问其 DAGS_FOLDER,并且您需要通过自己的方式同步文件系统

【讨论】:

您好@ImDarrenG,我遇到了打包的问题。请问take a look at my question【参考方案2】:

虽然按照文档中的说明将 dag 打包成 zip 是我见过的唯一受支持的解决方案,但您也可以导入 dags 文件夹中的模块。如果您使用 puppet 和 git 等其他工具自动同步 dags 文件夹,这将非常有用。

我从问题中不清楚你的目录结构,所以这里是一个基于典型 python 项目结构的示例 dags 文件夹:

└── airflow/dags  # root airflow dags folder where all dags live
    └── my_dags  # git repo project root
        ├── my_dags  # python src root (usually named same as project)
        │   ├── my_test_globals.py  # file I want to import
        │   ├── dag_in_package.py 
        │   └── dags 
        │        └── dag_in_subpackage.py
        ├── README.md  # also setup.py, LICENSE, etc here
        └── dag_in_project_root.py

我遗漏了(必需的 [1])__init__.py 文件。注意三个示例 dag 的位置。您几乎可以肯定只会将这些地方之一用于您的所有 dag。为了举例,我将它们都包括在这里,因为它对导入无关紧要。从其中任何一个导入my_test_globals

from my_dags.my_dags import my_test_globals

我相信这意味着 airflow 运行时将 python 路径设置为 dags 目录,因此 dags 文件夹的每个子目录都可以视为一个 python 包。在我的例子中,额外的中间项目根目录妨碍了典型的包内绝对导入。因此,我们可以像这样重组这个气流项目:

└── airflow/dags  # root airflow dags folder where all dags live
    └── my_dags  # git repo project root & python src root
        ├── my_test_globals.py  # file I want to import
        ├── dag_in_package.py 
        ├── dags 
        │    └── dag_in_subpackage.py
        ├── README.md  # also setup.py, LICENSE, etc here
        └── dag_in_project_root.py

让导入看起来像我们期望的那样:

from my_dags import my_test_globals

【讨论】:

很好的答案。喜欢目录结构。【参考方案3】:

第二个问题:Airflow+Celery 将如何在工作节点之间分发所有必要的 python 源文件?

来自文档:worker 需要有权访问其 DAGS_FOLDER,并且您需要按照自己的方式同步文件系统。一个常见的设置是将您的 DAGS_FOLDER 存储在 Git 存储库中,并使用 Chef、Puppet、Ansible 或您用于在环境中配置机器的任何工具在机器之间同步它。如果你所有的盒子都有一个共同的挂载点,那么在那里共享你的管道文件应该也可以工作

http://pythonhosted.org/airflow/installation.html?highlight=chef

【讨论】:

【参考方案4】:

对于您的第一个问题,这是可能的。

我猜你应该在与SimplePython.py 相同的目录下创建一个名为__init__.py 的空文件(在你的情况下是worker 目录)。通过这样做worker 目录将被视为一个python模块。

然后在你的 DAG 定义中,尝试from worker.SimplePython import print_context

在您的情况下,我想如果您为气流编写一个插件会更好,因为您可能希望在不删除自定义功能的情况下升级气流核心项目。

【讨论】:

以上是关于气流 - Python 文件不在同一个 DAG 文件夹中的主要内容,如果未能解决你的问题,请参考以下文章

将 python 脚本转换为气流 dag

如何删除气流中的默认示例 dag

在气流上部署 dag 文件的有效方法

气流 mysql 到 gcp Dag 错误

GCP apache气流,如何从私有存储库安装Python依赖项

使用 Apache 气流存储和访问密码