在气流上部署 dag 文件的有效方法

Posted

技术标签:

【中文标题】在气流上部署 dag 文件的有效方法【英文标题】:Efficient way to deploy dag files on airflow 【发布时间】:2018-06-30 21:57:19 【问题描述】:

在将新的 dag 部署到气流中时,是否遵循任何最佳做法?

我在 google 论坛上看到几个 cmets 说 dag 保存在 GIT 存储库中,并且会定期同步到气流集群中的本地位置。 关于这种方法,我有几个问题

我们是否为不同的环境维护单独的 dag 文件? (测试。生产) 如果新版本出现错误,如何处理 ETL 回滚到旧版本?

非常感谢这里的任何帮助。如果您需要更多详细信息,请告诉我?

【问题讨论】:

【参考方案1】:

到目前为止,Airflow 还没有自己的版本控制工作流功能(请参阅this)。 但是,您可以通过在自己的 git 存储库中管理 DAG 并将其状态作为子模块获取到气流存储库中来自行管理。通过这种方式,您始终拥有单个气流版本,其中包含具有特定版本的 DAG 集。观看更多here

【讨论】:

【参考方案2】:

这是我们为团队管理它的方式。

首先在命名约定方面,我们的每个 DAG 文件名 都与 DAG 本身(包括 DAG 版本)内容中的 DAG Id 匹配。这很有用,因为最终它是您在 Airflow UI 中看到的 DAG Id,因此您将确切知道每个 DAG 背后使用了哪个文件。

这样的 DAG 示例:

from airflow import DAG
from datetime import datetime, timedelta

default_args = 
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime(2017,12,05,23,59),
  'email': ['me@mail.com'],
  'email_on_failure': True


dag = DAG(
  'my_nice_dag-v1.0.9', #update version whenever you change something
  default_args=default_args,
  schedule_interval="0,15,30,45 * * * *",
  dagrun_timeout=timedelta(hours=24),
  max_active_runs=1)
  [...]

DAG 文件的名称为:my_nice_dag-v1.0.9.py

我们所有的 DAG 文件都存储在 Git 存储库中(除其他外) 每次在我们的主分支中完成合并请求时,我们的持续集成管道都会启动一个新构建并将我们的 DAG 文件打包到一个 zip 中(我们使用 Atlassian Bamboo,但还有其他解决方案,如 Jenkins、Circle CI、Travis...) 在 Bamboo 中,我们配置了一个部署脚本 (shell),它解压缩包并将 DAG 文件放在 Airflow 服务器上的 /dags 文件夹中。 我们通常将 DAG 部署在 DEV 中进行测试,然后部署到 UAT,最后部署到 PROD。借助上面提到的 shell 脚本,只需单击 Bamboo UI 中的按钮即可完成部署。

好处

    由于您在文件名中包含了 DAG 版本,因此 DAG 文件夹中的 DAG 文件的先前版本不会被覆盖,因此您可以轻松地返回到它 当您在 Airflow 中加载新的 DAG 文件时,您可以通过版本号在 UI 中识别它。 因为您的 DAG 文件名 = DAG Id,您甚至可以通过添加一些 Airflow 命令行来改进部署脚本,以便在部署新 DAG 后自动打开它们。 因为 DAG 的每个版本都在 Git 中进行了历史化,所以如果需要,我们可以随时恢复到以前的版本。

【讨论】:

嗨,Alexis,感谢您的澄清,所以如果有任何特定于环境的值,比如 HttpOperator 中的 URL,这些值是如何处理的,您可以为每个环境维护单独的 dag 文件或使用一些配置管理系统一样吗? 你好@SreenathKamath,对于环境特定的值,我们在各自的气流环境中的气流变量中配置它们。您可以在 Admin > Variables 下的菜单中找到它们。在您的 DAG 中,您可以使用 from airflow.models import Variable 调用这些变量,然后使用 Variable.get('my_variable_name') @SreenathKamath 如果答案让您满意,请考虑将此问题标记为已解决。谢谢 @alexis 如果我只是更新同名文件(dag),我不需要打开。 @alexis 我的另一个问题是你如何测试你的 DAG 文件?

以上是关于在气流上部署 dag 文件的有效方法的主要内容,如果未能解决你的问题,请参考以下文章

部署气流代码库

气流 - 如何仅“填充 DagBag”一次

气流:如何删除 DAG?

如何删除气流中的默认示例 dag

气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators

气流 mysql 到 gcp Dag 错误