如何与客户运营商验证气流 DAG?

Posted

技术标签:

【中文标题】如何与客户运营商验证气流 DAG?【英文标题】:How to validate airflow DAG with customer operator? 【发布时间】:2018-08-17 11:29:28 【问题描述】:

airflow docs 建议对 DAG 文件进行基本的健全性检查来解释它。即:

$ python ~/path/to/my/dag.py

我发现这很有用。不过,现在我在$AIRFLOW_HOME/plugins下创建了一个插件MordorOperator

从airflow.plugins_manager 导入AirflowPlugin 从airflow.utils.decorators 导入apply_defaults 从airflow.operators 导入BaseOperator 从气流.异常导入气流异常 进口鼠兔 导入json 类 MordorOperator(BaseOperator): JOB_QUEUE_MAPPING = “测试”:“测试” @apply_defaults def __init__(self, job, *args, **kwargs): super().__init__(*args, **kwargs) # 东西 def 执行(自我,上下文): # 东西 类 MordorPlugin(AirflowPlugin): name = "魔多插件" 运营商 = [魔多运营商]

我可以导入插件并在示例 DAG 中查看它的工作原理:

从气流导入 DAG 从气流.操作员导入 MordorOperator 从日期时间导入日期时间 dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False) hello_operator = MordorOperator(job="testing", task_id='run_single_task', dag=dag)

但是,当我尝试解释这个文件时,我会遇到一些我怀疑我不应该遇到的错误,因为插件成功运行了。我的怀疑是,这是因为在运行时发生了一些动态代码生成,当 DAG 自行解释时,这些代码生成不可用。我还发现 PyCharm 在导入插件时无法执行任何自动补全。

(venv) 下午 3:54 /Users/paymahn/solvvy/scheduler mordor.operator ✱ ❮❮❮ python dags/mordor_test.py 在配置中找不到部分/键 [core/airflow-home] 回溯(最近一次通话最后): 文件“dags/mordor_test.py”,第 2 行,在 从气流.操作员导入 MordorOperator ImportError:无法导入名称“MordorOperator”

如何对使用插件的 DAG 进行健全性测试?是否可以让 PyC​​harm 为自定义运算符提供自动补全功能?

【问题讨论】:

如何判断插件运行成功?你得到你期望的输出?另外,您正在运行哪个版本的 Airflow? FWIW,如果您在 python 中收到“无法导入”错误,这通常意味着您的文件中某处存在语法错误,这可能是由于它找不到依赖项。 我知道它是成功的,因为我可以在气流中运行任务并且可以看到我期望的日志。是的,我得到了我期望的输出。我正在运行气流 1.9。 【参考方案1】:

我在 docker 容器中运行气流,并有一个作为容器入口点运行的脚本。事实证明,当我运行测试时,plugins 文件夹对我的容器不可用。我必须在容器中添加一个符号链接作为安装脚本的一部分。我的问题的解决方案对我来说非常具体,如果其他人偶然发现这个问题,除了:确保您的插件文件夹正确可用之外,我对您的情况没有很好的答案。

【讨论】:

以上是关于如何与客户运营商验证气流 DAG?的主要内容,如果未能解决你的问题,请参考以下文章

运营商之间的气流和数据传输

气流:每日刷新后如何在 s3 存储桶中公开对象

气流:如何删除 DAG?

如何防止气流回填 dag 运行?

如何删除气流中的默认示例 dag

气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators