Airflow DAG 从哪里获得它的依赖关系?

Posted

技术标签:

【中文标题】Airflow DAG 从哪里获得它的依赖关系?【英文标题】:Where does Airflow DAGs get its dependencies? 【发布时间】:2019-12-17 09:34:23 【问题描述】:

我正在努力使用 Airflow 执行一些非常简单的任务。

对于上下文,我使用 docker-compose 来运行带有 Airflow 和 Postgres 的 docker 容器。 (https://github.com/puckel/docker-airflow)

我正在尝试测试我们的一个内部库与 Airflow 的集成。我用来快速测试的不是很干净的方法是将 docker exec 放入气流容器并 pip 安装适当的库(通过主机共享到具有只读模式的 Docker 卷的容器)。

所有东西都用 pip 正确安装,我可以在运行虚拟 Python 脚本时使用我的库。

但是,当我在 DAG python 文件中集成相同的逻辑时,我收到错误“损坏的 dag,没有名为 inhouse_lib 的模块。

起初我认为 Airflow 是在相对于 Python 版本的特定 pip 目录中选择依赖项,并且我将库安装在另一个 pip 目录中。

但对于所有 Python 二进制文件,它们都使用 Python 3.7。

对于我在创建 pip 列表时拥有的所有 pip 二进制文件(pip、pip3、pip3.7),我可以找到我的内部库。

我不明白我应该如何部署我的库以便 Airflow 可以接收它们。任何见解都将不胜感激。

感谢您的帮助。

编辑 为了澄清我试图做的事情,下面有一些细节。在我的 DAG 中,我想使用一个自定义 Python 库(我们称其为尚未实现的 myLib 功能。一旦实现,我想将这个最新版本的 myLib 部署到气流容器中。

我用一个卷更新了 docker-compose.yml,该卷将我的主机目录与容器气流主页上的 myLib 映射。

# Go in the container
docker exec -it <airflow docker container ID> bash

# Install myLib to Python environment
pip install myLib

# Check the installation
pip list | grep myLib # output myLib

# Check the import in Python REPL
python
import myLib # No Python error

相同的导入在我的 Airflow DAG 中不起作用。检查容器日志时,出现以下错误:

[2019-08-30 15:14:30,499] __init__.py:51 INFO - Using executor LocalExecutor
[2019-08-30 15:14:30,894] dagbag.py:90 INFO - Filling up the DagBag from /usr/local/airflow/dags
[2019-08-30 15:14:30,897] dagbag.py:205 ERROR - Failed to import: /usr/local/airflow/dags/mydag.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 202, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/airflow/dags/mydag.py", line 7, in <module>
    import myLib
ModuleNotFoundError: No module named 'myLib'
[2019-08-30 15:14:31 +0000] [167] [INFO] Handling signal: ttou
[2019-08-30 15:14:31 +0000] [11446] [INFO] Worker exiting (pid: 11446)

【问题讨论】:

你有没有想过这个问题?我遇到了同样的问题。我可以运行 python 并毫无错误地导入我需要的所有内容,但是当 Airflow 尝试加载 dag 时,它给了我一堆 Module not found 错误。这让我快疯了! 我们在最新的气流版本中遇到了同样的问题。任何人都可以在这里提出建议。 【参考方案1】:

对于每个 dag,您需要在运行前对其进行测试。

您可以使用以下 cli 命令来检查环境和代码逻辑:

airflow list dags
airflow test [dag_name] [task_name] [date]

根据您的问题,您应该面临环境依赖性问题。您可以通过 docker 容器中的气流列表 dags 来检查它。为了解决您的任务,我们有两种方法: 1.在airflow.cfg文件中设置dags文件夹,把你的module文件放到dags文件夹中。

    检查气流环境中的python路径,确保您的模块可以访问。

=========================== 更新1: 为了检查您的模块是否正确安装,您可以使用以下命令:

    码头图片 | grep [你的气流图像名称] 找到容器ID docker run [容器 id] python 在 docker python 环境中检查您的模型是否正确安装。 like : import os 如果您收到任何错误消息,您 需要通过 pip 双重检查您的模块安装。

更新 2: 为了检查您的依赖性,您可以: 编写一个简单的 dags,然后使用气流测试 [dag_name] [dag_task_name] [date] 看看是否工作。

据我了解,您可以尝试从零开始构建气流图像,它可能会很好地工作。 docker 容器可能会假设一些环境。和用户实例。

如果您更喜欢继续使用网络 posed 容器,您可以尝试在登录时切换到相同的使用 id 并安装您的 python 库。喜欢跟随模式

docker exec -u [user_name you can find you the dockerfile] [container_id] you command.

不要忘记将每次更改提交到新图像 ID 并从新图像 ID 加载容器,否则每次运行时可能会丢失更改

【讨论】:

感谢您的帮助。关于您的 1. 和 2. 这是导入自定义库的方式吗?将每个模块和包放在 dag bag 中?我希望使用安装了 pip 的库。您是否知道是否有办法确定 Airflow DAG 选择导入库的位置?例如路径?谢谢 在您更新后:这就是我所做的。在 pip 安装我的包之后,我创建了一个包含自定义包导入的虚拟 Python 脚本。一切正常。但是,当我在 Airflow DAG 中执行相同操作时,无法解决依赖关系。所以我的问题是,有没有办法确定 Airflow 在运行时选择其依赖项的位置? 你可以写一个简单的dag,它只是导入模型。这样你就可以看到你的模型依赖是否运作良好。 我们遇到了同样的问题,它来自最新版本的气流。【参考方案2】:

使用以下内容构建 puckel dockerfile:

docker build --rm --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow .

将要通过 pip 安装的 Python 依赖项列表添加到 PYTHON_DEPS 变量中,以逗号分隔的列表形式。

这将构建安装了依赖项的映像,然后您可以在 dags 中使用它,只需 import yourpackage

【讨论】:

感谢 kaxil 的帮助。你知道在容器运行时是否有办法吗?在我的开发流程中,我会定期对依赖项进行修改。另一个问题:在你的代码块之后,你如何传递自定义 Python 依赖项?谢谢

以上是关于Airflow DAG 从哪里获得它的依赖关系?的主要内容,如果未能解决你的问题,请参考以下文章

大数据调度平台Airflow:Airflow使用

Airflow 中文文档:概念

airflow Operators

入门Airflow 编写第一个DAG

让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?

气流 DAG 步骤依赖项