无法导入 Airflow 插件
Posted
技术标签:
【中文标题】无法导入 Airflow 插件【英文标题】:Can't import Airflow plugins 【发布时间】:2017-10-10 00:10:54 【问题描述】:跟随气流教程here。
问题:网络服务器返回以下错误
Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name
MyFirstOperator
注意事项: 目录结构如下:
airflow_home
├── airflow.cfg
├── airflow.db
├── dags
│ └── test_operators.py
├── plugins
│ └── my_operators.py
└── unittests.cfg
我正在尝试像这样在“test_operators.py”中导入插件:
from airflow.operators import MyFirstOperator
代码与教程中的完全相同。
【问题讨论】:
我以前从未使用过 Airflow。但是你用pip安装了项目的需求了吗? @cbll -- 是的,一切都是根据文档安装的:airflow.incubator.apache.org/installation.html 顺便说一句,我在这里看到了一个相当相似的教程:technofob.com/2019/05/30/… 顺便说一句,请查看***.com/questions/43380679/…,了解如何让 PyCharm 理解代码。 astronomer.io 的方法大纲(参见@Bjorn 的回答)效果很好。此外,当新操作符添加到plugins
文件夹或新 dags 添加到 dags
文件夹时,我确实 NOT 必须重新启动任何服务。 注意:在带有 EFS 的 Amazon Fargate 上进行了测试,以在 Web 服务器、调度程序和工作容器之间同步 dags
和 plugins
。
【参考方案1】:
在努力研究 Airflow 文档并在此处尝试一些答案但没有成功后,我找到了 this approach from astronomer.io。
正如他们所指出的,构建 Airflow 插件可能会让人感到困惑,而且可能不是未来添加钩子和运算符的最佳方式。
自定义挂钩和运算符是扩展 Airflow 以满足您的需求的强大方法。然而,在最佳方式上存在一些混淆 实施它们。根据 Airflow 文档,它们可以是 使用 Airflow 的插件机制添加。然而,这过于复杂 这个问题并导致许多人感到困惑。气流均匀 考虑弃用使用插件机制的钩子和 未来的运营商。
因此,我没有使用 Plugins API,而是遵循了 Astronomer 的方法,如下所示设置 Airflow。
dags
└── my_dag.py (contains dag and tasks)
plugins
├── __init__.py
├── hooks
│ ├── __init__.py
│ └── mytest_hook.py (contains class MyTestHook)
└── operators
├── __init__.py
└── mytest_operator.py (contains class MyTestOperator)
使用这种方法,我的操作符和钩子的所有代码都完全存在于它们各自的文件中 - 并且没有令人困惑的插件文件。所有__init__.py
文件都是空的(与将插件代码放入其中一些同样令人困惑的方法不同)。
对于所需的import
s,请考虑 Airflow 实际如何使用插件目录:
当 Airflow 运行时,它会将 dags/、plugins/ 和 config/ 添加到 PATH
这意味着执行from airflow.operators.mytest_operator import MyTestOperator
可能行不通。
相反,from operators.mytest_operator import MyTestOperator
是要走的路(请注意我上面设置中与from directory/file.py import Class
的对齐方式)。
我的文件中的工作 sn-ps 如下所示。
my_dag.py:
from airflow import DAG
from operators.mytest_operator import MyTestOperator
default_args = ....
dag = DAG(....)
....
mytask = MyTestOperator(task_id='MyTest Task', dag=dag)
....
my_operator.py:
from airflow.models import BaseOperator
from hooks.mytest_hook import MyTestHook
class MyTestOperator(BaseOperator):
....
hook = MyTestHook(....)
....
my_hook.py:
class MyTestHook():
....
这对我有用,并且比尝试继承 AirflowPlugin 简单得多。但是,如果您想更改网络服务器 UI,它可能不适合您:
注意:插件机制仍然必须用于插件 对网络服务器 UI 的更改。
顺便说一句,我在此之前遇到的错误(现已解决):
ModuleNotFoundError: No module named 'mytest_plugin.hooks.mytest_hook'
ModuleNotFoundError: No module named 'operators.mytest_plugin'
【讨论】:
我认为这是最好的方法。调度程序和/或网络服务器是否需要重新启动?我没有看到 astronomer.io 文章中提到的内容? 我确认此方法适用于服务器和 webUI 重启(也许 webUI 没用,但我都做了)。 我不记得我是否重新启动了气流调度程序服务。可能:) 不需要重新启动气流网络服务器服务。仅供参考,有 Airflow 独立的工作进程(它们从调度程序生成的队列中提取任务),这些可能会变得陈旧。如果有疑问,请重新启动调度程序(并仔细检查停止和启动之间是否存在任何陈旧的工作进程)。这假设使用我正在使用的本地/顺序执行器,不确定分布式设置,例如使用芹菜工人。 很好的答案。这对我有用。谢谢。【参考方案2】:我使用气流 1.10。 如果要导入的是自定义算子,可以上传到airflow plugins文件夹,然后在DAG中指定导入为:
来自 [文件名] 导入 [类名]
在哪里: filename 是您的插件文件的名称 classname 是您的班级的名称。
例如: 如果您的文件名是 my_first_plugin 并且类名是 MyFirstOperator 那么,导入将是:
从 my_first_plugin 导入 MyFirstOperator
在我使用 airflow 1.10 时为我工作
谢谢!希望这会有所帮助!
【讨论】:
虽然这可行并且显然更简单,但我想知道为什么 Airflow 推荐插件机制,即拥有plugins/__init__.py
和 class MyPlugin(AirflowPlugin): name = 'my_first_plugin' operators = [MyFirstOperator]
我看到的唯一“优势”是然后你会导入插件为from airflow.operators.my_first_plugin import MyFirstOperator
【参考方案3】:
在文章中是这样的:
class MyFirstPlugin(AirflowPlugin):
name = "my_first_plugin"
operators = [MyFirstOperator]
改为使用:
class MyFirstPlugin(AirflowPlugin):
name = "my_first_plugin"
operators = [MyFirstOperator]
# A list of class(es) derived from BaseHook
hooks = []
# A list of class(es) derived from BaseExecutor
executors = []
# A list of references to inject into the macros namespace
macros = []
# A list of objects created from a class derived
# from flask_admin.BaseView
admin_views = []
# A list of Blueprint object created from flask.Blueprint
flask_blueprints = []
# A list of menu links (flask_admin.base.MenuLink)
menu_links = []
也不要使用:
from airflow.operators import MyFirstOperator
According to the airflow article on plugins, it should be:
from airflow.operators.my_first_plugin import MyFirstOperator
如果这不起作用,请尝试:
from airflow.operators.my_operators import MyFirstOperator
如果这不起作用,请在启动时检查您的网络服务器日志以获取更多信息。
【讨论】:
谢谢,我已经试过了 - 在导入下,它会引发“没有名为 'my_first_plugin'、'my_operators' 的模块。 您使用的是哪个版本的气流?如果是 1.7 可以升级到 1.8 吗? 对于 1.8,您可以在 source code 中找到此提示:直接从 'airflow.operators' 导入插件运算符 ... 已被弃用。请改为从“airflow.operators.[plugin_module]”导入。 Airflow 2.0 将完全放弃对直接导入的支持。 AirflowPlugin 子类的name 属性将成为模块名。例如如果name = "my_first_plugin"
然后在 dag 中使用 from airflow.operators.my_first_plugin import MyFirstOperator
。 my_first_plugin
绝对行不通。正如@ChristophHösler 所提到的,旧方式from airflow.operators import MyFirstOperator
有效,但会因为它污染命名空间而被删除。新方式:github.com/apache/incubator-airflow/blob/master/airflow/… 旧方式github.com/apache/incubator-airflow/blob/master/airflow/…
截至今天使用气流 1.10,格式“从气流.operators 导入 MyFirstOperator”对我来说可以加载传感器。【参考方案4】:
我重新启动了网络服务器,现在一切正常。
以下是我认为可能发生的情况:
-
在开始使用教程示例之前,我尝试运行自己的插件和 dag。第一次运行时我修复了一个小语法错误,但修复后我开始收到“无法导入名称”错误。
我删除了插件和 dag,并尝试使用教程中的那个来看看发生了什么。
我的猜测是第 1 步的错误以某种方式影响了第 2 步。
【讨论】:
根据我的经验,添加/修改任何插件时都需要重新启动网络服务器。 @Daniel Lee 在这里提出了一个很好的观点,您还需要重新启动您的网络服务器和调度程序,至少这在 Airflow 1.8.2 上对我有用 这在 1.8.2 上是正确的...需要在其他版本上测试。 Ctrl-c 杀死它然后重新启动它。 @howMuchCheeseIsTooMuchCheese 只是一个小提示:当你向插件添加任何东西时,通常需要重新启动 Web 服务器。当网络服务器重新启动标准输出中的前几行(如果网络服务器处于调试日志记录模式)将是插件导入。如果您的插件语法有任何问题,它们将显示在那里。另外需要注意的是,不要在操作符的 init 函数中放入任何昂贵的操作,这些操作将在每次调度程序循环时执行。【参考方案5】:Airflow 版本 2 引入了一种新的插件管理机制,如 their official documentation 所述:
在 2.0 版中更改:通过 airflow.operators,sensors, hooks.
导入插件中添加的运算符、传感器、挂钩 ,并且这些扩展应该作为常规的 python 模块导入。更多信息请参阅:模块管理和创建自定义算子
您需要管理您的 Python 代码,只需将您的代码放在 plugins
文件夹中,然后从此时开始寻址文件。假设您在路径$AIRFLOW_HOME/plugins/t_plugin/operators/test.py
的test.py
文件中编写了 TestClass,在 dag 文件中您可以这样导入:
from t_plugin.operators.test import TestClass
【讨论】:
【参考方案6】:我必须更新文件 airflow.cfg
中的插件路径才能解决问题。
您的 Airflow 插件的存储位置:
plugins_folder = /airflow/plugins
【讨论】:
【参考方案7】:我在关注these tutorials 时遇到了同样的错误。
然而,我的错是我在task_id
中使用了空格字符' '
,Airflow
不支持。
显然错误并没有指向实际问题。重新启动 Airflow scheduler
和 webserver
然后在 WebUI 上显示正确的错误消息。
【讨论】:
从source-code可以看出,dag_id
s & task_id
s 除了字母数字字符【参考方案8】:
根据文档 -
插件文件夹中的 python 模块被导入,钩子、操作符、传感器、宏、执行器和 Web 视图被集成到 Airflow 的主要集合中并可供使用。
在 1.10.1 版中运行良好
【讨论】:
【参考方案9】:就我而言,我设法通过以下步骤制作了一个自定义运算符:
-
气流 10.3
在 DAG 文件中
from airflow.operators import MacrosPostgresOperator
在 ~/airflow/plugins 文件夹中,我有一个 python 文件custom_operator.py
,代码非常简单
from airflow.plugins_manager import AirflowPlugin
from airflow.operators.postgres_operator import PostgresOperator
class MacrosPostgresOperator(PostgresOperator):
template_fields = ('sql', 'parameters')
class MacrosFirstPlugin(AirflowPlugin):
name = "macros_first_plugin"
operators = [MacrosPostgresOperator]
【讨论】:
【参考方案10】:您必须停止 (CTRL-C) 并重新启动您的 Airflow 网络服务器和调度程序。
【讨论】:
【参考方案11】:按照相同的教程,我遇到了同样的问题。对我有用的是将MyFirstOperator
的导入替换为:
from airflow_home.plugins.my_operators import MyFirstOperator
【讨论】:
【参考方案12】:假设,以下是您在my_operators.py
中实现的自定义插件,
class MyFirstPlugin(AirflowPlugin):
name = "my_first_plugin"
operators = [MyFirstOperator]
那么按照Airflow documentation,你必须导入如下结构,
from airflow.type, like "operators", "sensors".name specified inside the plugin class import *
所以,在你的情况下,你应该像下面这样导入,
from airflow.operators.my_first_plugin import MyFirstOperator
【讨论】:
以上是关于无法导入 Airflow 插件的主要内容,如果未能解决你的问题,请参考以下文章
Apache Airflow Celery Executor。导入一个本地自定义的python包