无法导入 Airflow 插件

Posted

技术标签:

【中文标题】无法导入 Airflow 插件【英文标题】:Can't import Airflow plugins 【发布时间】:2017-10-10 00:10:54 【问题描述】:

跟随气流教程here。

问题:网络服务器返回以下错误

Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name 
MyFirstOperator

注意事项: 目录结构如下:

airflow_home
├── airflow.cfg
├── airflow.db
├── dags
│   └── test_operators.py  
├── plugins
│   └── my_operators.py   
└── unittests.cfg

我正在尝试像这样在“test_operators.py”中导入插件:

from airflow.operators import MyFirstOperator

代码与教程中的完全相同。

【问题讨论】:

我以前从未使用过 Airflow。但是你用pip安装了项目的需求了吗? @cbll -- 是的,一切都是根据文档安装的:airflow.incubator.apache.org/installation.html 顺便说一句,我在这里看到了一个相当相似的教程:technofob.com/2019/05/30/… 顺便说一句,请查看***.com/questions/43380679/…,了解如何让 PyC​​harm 理解代码。 astronomer.io 的方法大纲(参见@Bjorn 的回答)效果很好。此外,当新操作符添加到 plugins 文件夹或新 dags 添加到 dags 文件夹时,我确实 NOT 必须重新启动任何服务。 注意:在带有 EFS 的 Amazon Fargate 上进行了测试,以在 Web 服务器、调度程序和工作容器之间同步 dagsplugins 【参考方案1】:

在努力研究 Airflow 文档并在此处尝试一些答案但没有成功后,我找到了 this approach from astronomer.io。

正如他们所指出的,构建 Airflow 插件可能会让人感到困惑,而且可能不是未来添加钩子和运算符的最佳方式。

自定义挂钩和运算符是扩展 Airflow 以满足您的需求的强大方法。然而,在最佳方式上存在一些混淆 实施它们。根据 Airflow 文档,它们可以是 使用 Airflow 的插件机制添加。然而,这过于复杂 这个问题并导致许多人感到困惑。气流均匀 考虑弃用使用插件机制的钩子和 未来的运营商。

因此,我没有使用 Plugins API,而是遵循了 Astronomer 的方法,如下所示设置 Airflow。

dags
└── my_dag.py               (contains dag and tasks)
plugins
├── __init__.py
├── hooks
│   ├── __init__.py
│   └── mytest_hook.py      (contains class MyTestHook)
└── operators
    ├── __init__.py
    └── mytest_operator.py  (contains class MyTestOperator)

使用这种方法,我的操作符和钩子的所有代码都完全存在于它们各自的文件中 - 并且没有令人困惑的插件文件。所有__init__.py 文件都是空的(与将插件代码放入其中一些同样令人困惑的方法不同)。

对于所需的imports,请考虑 Airflow 实际如何使用插件目录:

当 Airflow 运行时,它会将 dags/、plugins/ 和 config/ 添加到 PATH

这意味着执行from airflow.operators.mytest_operator import MyTestOperator 可能行不通。 相反,from operators.mytest_operator import MyTestOperator 是要走的路(请注意我上面设置中与from directory/file.py import Class 的对齐方式)。

我的文件中的工作 sn-ps 如下所示。

my_dag.py:

from airflow import DAG
from operators.mytest_operator import MyTestOperator
default_args = ....
dag = DAG(....)
....
mytask = MyTestOperator(task_id='MyTest Task', dag=dag)
....

my_operator.py:

from airflow.models import BaseOperator
from hooks.mytest_hook import MyTestHook

class MyTestOperator(BaseOperator):
    ....
    hook = MyTestHook(....)
    ....

my_hook.py:

class MyTestHook():
    ....

这对我有用,并且比尝试继承 AirflowPlugin 简单得多。但是,如果您想更改网络服务器 UI,它可能不适合您:

注意:插件机制仍然必须用于插件 对网络服务器 UI 的更改。

顺便说一句,我在此之前遇到的错误(现已解决):

ModuleNotFoundError: No module named 'mytest_plugin.hooks.mytest_hook'
ModuleNotFoundError: No module named 'operators.mytest_plugin'

【讨论】:

我认为这是最好的方法。调度程序和/或网络服务器是否需要重新启动?我没有看到 astronomer.io 文章中提到的内容? 我确认此方法适用于服务器和 webUI 重启(也许 webUI 没用,但我都做了)。 我不记得我是否重新启动了气流调度程序服务。可能:) 不需要重新启动气流网络服务器服务。仅供参考,有 Airflow 独立的工作进程(它们从调度程序生成的队列中提取任务),这些可能会变得陈旧。如果有疑问,请重新启动调度程序(并仔细检查停止和启动之间是否存在任何陈旧的工作进程)。这假设使用我正在使用的本地/顺序执行器,不确定分布式设置,例如使用芹菜工人。 很好的答案。这对我有用。谢谢。【参考方案2】:

我使用气流 1.10。 如果要导入的是自定义算子,可以上传到airflow plugins文件夹,然后在DAG中指定导入为:

来自 [文件名] 导入 [类名]

在哪里: filename 是您的插件文件的名称 classname 是您的班级的名称。

例如: 如果您的文件名是 my_first_plugin 并且类名是 MyFirstOperator 那么,导入将是:

my_first_plugin 导入 MyFirstOperator

在我使用 airflow 1.10 时为我工作

谢谢!希望这会有所帮助!

【讨论】:

虽然这可行并且显然更简单,但我想知道为什么 Airflow 推荐插件机制,即拥有 plugins/__init__.pyclass MyPlugin(AirflowPlugin): name = 'my_first_plugin' operators = [MyFirstOperator] 我看到的唯一“优势”是然后你会导入插件为from airflow.operators.my_first_plugin import MyFirstOperator【参考方案3】:

在文章中是这样的:

class MyFirstPlugin(AirflowPlugin):
    name = "my_first_plugin"
    operators = [MyFirstOperator]

改为使用:

class MyFirstPlugin(AirflowPlugin):
    name = "my_first_plugin"
    operators = [MyFirstOperator]
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of class(es) derived from BaseExecutor
    executors = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of objects created from a class derived
    # from flask_admin.BaseView
    admin_views = []
    # A list of Blueprint object created from flask.Blueprint
    flask_blueprints = []
    # A list of menu links (flask_admin.base.MenuLink)
    menu_links = []

也不要使用:

from airflow.operators import MyFirstOperator

According to the airflow article on plugins, it should be:

from airflow.operators.my_first_plugin import MyFirstOperator

如果这不起作用,请尝试:

from airflow.operators.my_operators import MyFirstOperator

如果这不起作用,请在启动时检查您的网络服务器日志以获取更多信息。

【讨论】:

谢谢,我已经试过了 - 在导入下,它会引发“没有名为 'my_first_plugin'、'my_operators' 的模块。 您使用的是哪个版本的气流?如果是 1.7 可以升级到 1.8 吗? 对于 1.8,您可以在 source code 中找到此提示:直接从 'airflow.operators' 导入插件运算符 ... 已被弃用。请改为从“airflow.operators.[plugin_module]”导入。 Airflow 2.0 将完全放弃对直接导入的支持。 AirflowPlugin 子类的name 属性将成为模块名。例如如果 name = "my_first_plugin" 然后在 dag 中使用 from airflow.operators.my_first_plugin import MyFirstOperatormy_first_plugin 绝对行不通。正如@ChristophHösler 所提到的,旧方式from airflow.operators import MyFirstOperator 有效,但会因为它污染命名空间而被删除。新方式:github.com/apache/incubator-airflow/blob/master/airflow/… 旧方式github.com/apache/incubator-airflow/blob/master/airflow/… 截至今天使用气流 1.10,格式“从气流.operators 导入 MyFirstOperator”对我来说可以加载传感器。【参考方案4】:

我重新启动了网络服务器,现在一切正常。

以下是我认为可能发生的情况:

    在开始使用教程示例之前,我尝试运行自己的插件和 dag。第一次运行时我修复了一个小语法错误,但修复后我开始收到“无法导入名称”错误。 我删除了插件和 dag,并尝试使用教程中的那个来看看发生了什么。

我的猜测是第 1 步的错误以某种方式影响了第 2 步。

【讨论】:

根据我的经验,添加/修改任何插件时都需要重新启动网络服务器。 @Daniel Lee 在这里提出了一个很好的观点,您还需要重新启动您的网络服务器和调度程序,至少这在 Airflow 1.8.2 上对我有用 这在 1.8.2 上是正确的...需要在其他版本上测试。 Ctrl-c 杀死它然后重新启动它。 @howMuchCheeseIsTooMuchCheese 只是一个小提示:当你向插件添加任何东西时,通常需要重新启动 Web 服务器。当网络服务器重新启动标准输出中的前几行(如果网络服务器处于调试日志记录模式)将是插件导入。如果您的插件语法有任何问题,它们将显示在那里。另外需要注意的是,不要在操作符的 init 函数中放入任何昂贵的操作,这些操作将在每次调度程序循环时执行。【参考方案5】:

Airflow 版本 2 引入了一种新的插件管理机制,如 their official documentation 所述:

在 2.0 版中更改:通过 airflow.operators,sensors, hooks. 导入插件中添加的运算符、传感器、挂钩,并且这些扩展应该作为常规的 python 模块导入。更多信息请参阅:模块管理和创建自定义算子

您需要管理您的 Python 代码,只需将您的代码放在 plugins 文件夹中,然后从此时开始寻址文件。假设您在路径$AIRFLOW_HOME/plugins/t_plugin/operators/test.pytest.py 文件中编写了 TestClass,在 dag 文件中您可以这样导入:

from t_plugin.operators.test import TestClass

【讨论】:

【参考方案6】:

我必须更新文件 airflow.cfg 中的插件路径才能解决问题。

您的 Airflow 插件的存储位置:

plugins_folder = /airflow/plugins

【讨论】:

【参考方案7】:

我在关注these tutorials 时遇到了同样的错误。

然而,我的错是我在task_id 中使用了空格字符' 'Airflow 不支持。

显然错误并没有指向实际问题。重新启动 Airflow schedulerwebserver 然后在 WebUI 上显示正确的错误消息。

【讨论】:

从source-code可以看出,dag_ids & task_ids 除了字母数字字符【参考方案8】:

根据文档 -

插件文件夹中的 python 模块被导入,钩子、操作符、传感器、宏、执行器和 Web 视图被集成到 Airflow 的主要集合中并可供使用。

在 1.10.1 版中运行良好

【讨论】:

【参考方案9】:

就我而言,我设法通过以下步骤制作了一个自定义运算符:

    气流 10.3 在 DAG 文件中from airflow.operators import MacrosPostgresOperator 在 ~/airflow/plugins 文件夹中,我有一个 python 文件custom_operator.py,代码非常简单
from airflow.plugins_manager import AirflowPlugin
from airflow.operators.postgres_operator import PostgresOperator

 class MacrosPostgresOperator(PostgresOperator):
    template_fields = ('sql', 'parameters')

class MacrosFirstPlugin(AirflowPlugin):
    name = "macros_first_plugin"
    operators = [MacrosPostgresOperator]

【讨论】:

【参考方案10】:

您必须停止 (CTRL-C) 并重新启动您的 Airflow 网络服务器和调度程序。

【讨论】:

【参考方案11】:

按照相同的教程,我遇到了同样的问题。对我有用的是将MyFirstOperator 的导入替换为:

from airflow_home.plugins.my_operators import MyFirstOperator

【讨论】:

【参考方案12】:

假设,以下是您在my_operators.py 中实现的自定义插件,

class MyFirstPlugin(AirflowPlugin):
    name = "my_first_plugin"
    operators = [MyFirstOperator]

那么按照Airflow documentation,你必须导入如下结构,

from airflow.type, like "operators", "sensors".name specified inside the plugin class import *

所以,在你的情况下,你应该像下面这样导入,

from airflow.operators.my_first_plugin import MyFirstOperator

【讨论】:

以上是关于无法导入 Airflow 插件的主要内容,如果未能解决你的问题,请参考以下文章

Airflow自定义插件, 使用datax抽数

大数据调度平台Airflow:Airflow使用

Apache Airflow Celery Executor。导入一个本地自定义的python包

airflow异常捕捉

Google Cloud Composer (Apache Airflow) 无法访问日志文件

无法从 Airflow 应用程序访问 Vault 服务器