如何与客户运营商验证气流 DAG?
Posted
技术标签:
【中文标题】如何与客户运营商验证气流 DAG?【英文标题】:How to validate airflow DAG with customer operator? 【发布时间】:2018-08-17 11:29:28 【问题描述】:airflow docs 建议对 DAG 文件进行基本的健全性检查来解释它。即:
$ python ~/path/to/my/dag.py我发现这很有用。不过,现在我在$AIRFLOW_HOME/plugins
下创建了一个插件MordorOperator
:
我可以导入插件并在示例 DAG 中查看它的工作原理:
从气流导入 DAG 从气流.操作员导入 MordorOperator 从日期时间导入日期时间 dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False) hello_operator = MordorOperator(job="testing", task_id='run_single_task', dag=dag)但是,当我尝试解释这个文件时,我会遇到一些我怀疑我不应该遇到的错误,因为插件成功运行了。我的怀疑是,这是因为在运行时发生了一些动态代码生成,当 DAG 自行解释时,这些代码生成不可用。我还发现 PyCharm 在导入插件时无法执行任何自动补全。
(venv) 下午 3:54 /Users/paymahn/solvvy/scheduler mordor.operator ✱ ❮❮❮ python dags/mordor_test.py 在配置中找不到部分/键 [core/airflow-home] 回溯(最近一次通话最后): 文件“dags/mordor_test.py”,第 2 行,在 从气流.操作员导入 MordorOperator ImportError:无法导入名称“MordorOperator”如何对使用插件的 DAG 进行健全性测试?是否可以让 PyCharm 为自定义运算符提供自动补全功能?
【问题讨论】:
如何判断插件运行成功?你得到你期望的输出?另外,您正在运行哪个版本的 Airflow? FWIW,如果您在 python 中收到“无法导入”错误,这通常意味着您的文件中某处存在语法错误,这可能是由于它找不到依赖项。 我知道它是成功的,因为我可以在气流中运行任务并且可以看到我期望的日志。是的,我得到了我期望的输出。我正在运行气流 1.9。 【参考方案1】:我在 docker 容器中运行气流,并有一个作为容器入口点运行的脚本。事实证明,当我运行测试时,plugins
文件夹对我的容器不可用。我必须在容器中添加一个符号链接作为安装脚本的一部分。我的问题的解决方案对我来说非常具体,如果其他人偶然发现这个问题,除了:确保您的插件文件夹正确可用之外,我对您的情况没有很好的答案。
【讨论】:
以上是关于如何与客户运营商验证气流 DAG?的主要内容,如果未能解决你的问题,请参考以下文章