气流:如何删除 DAG?

Posted

技术标签:

【中文标题】气流:如何删除 DAG?【英文标题】:Airflow: how to delete a DAG? 【发布时间】:2017-03-31 19:28:21 【问题描述】:

我已经启动了 Airflow 网络服务器并安排了一些 dags。我可以在 Web GUI 上看到 dags。

如何删除正在运行并在 Web GUI 中显示的特定 DAG?是否有 Airflow CLI 命令可以执行此操作?

我环顾四周,但找不到在加载和调度 DAG 后删除 DAG 的简单方法的答案。

【问题讨论】:

这里没有 CLI。但是,如果您想尝试恢复它,则会放弃一个拉取请求:github.com/apache/incubator-airflow/pull/1344 在 Airflow 版本 【参考方案1】:

编辑 8/27/18 - Airflow 1.10 现已在 PyPI 上发布!

https://pypi.org/project/apache-airflow/1.10.0/


如何彻底删除 DAG

我们现在在 Airflow ≥ 1.10 中拥有此功能!

PR #2199 (Jira: AIRFLOW-1002) 将 DAG 删除添加到 Airflow 现已合并,允许从所有相关表中完全删除 DAG 条目。

核心 delete_dag(...) 代码现在是实验 API 的一部分,并且有可用的入口点 via the CLI 和 via the REST API。

命令行界面:

airflow delete_dag my_dag_id

REST API(在本地运行网络服务器):

curl -X "DELETE" http://127.0.0.1:8080/api/experimental/dags/my_dag_id

关于 REST API 的警告:确保您的 Airflow 集群 uses authentication 在生产中。

安装/升级到 Airflow 1.10(当前)

要升级,请运行:

export SLUGIFY_USES_TEXT_UNIDECODE=yes

或:

export AIRFLOW_GPL_UNIDECODE=yes

然后:

pip install -U apache-airflow

记得先查看UPDATING.md了解完整详情!

【讨论】:

@Mike 不错。刚刚修好了。谢谢! Airflow 1.10.1 现在添加了从 Web UI 中删除 DAG 的功能 这给了我airflow.exceptions.DagFileExists: Dag id example_bash_operator is still in DagBag. Remove the DAG file first @akki 通过 API 或 UI 删除 DAG 只会从数据库表中删除 DAG 的历史记录,而不是 DAG 文件本身,因此如果您的目标是,最好先删除 DAG 的 .py 文件不要让 DAG 再次运行。 @akki 是的,示例 DAG 很奇怪,因为它们是内置的。在[core] 组下的airflow.cfg 配置文件中,我建议将load_examples 设置为False 用于生产实例。这相当于删除示例中的 DAG 文件。 more info【参考方案2】:

这是我使用带有默认 connection_id 的 PostgresHook 改编的代码。

import sys
from airflow.hooks.postgres_hook import PostgresHook

dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    sql="delete from  where dag_id=''".format(t, dag_input)
    hook.run(sql, True)

【讨论】:

我认为您也可以将task_faildag_stats 添加到该表列表中 即使在运行后我仍然在 UI 中看到 dag。当我点击它时,它说 dag 丢失了。有没有办法从 UI 中删除它?【参考方案3】:

不知道为什么 Apache Airflow 没有明显且简单的方法来删除 DAG

归档https://issues.apache.org/jira/browse/AIRFLOW-1002

【讨论】:

此项目的 PR 已打开,但尚未合并。感兴趣的人的链接 - github.com/apache/incubator-airflow/pull/2199。【参考方案4】:

DAG-s 可以在 Airflow 1.10 中删除,但操作的过程和顺序必须正确。 有一个“鸡蛋和鸡肉问题” - 如果您在文件仍然存在时从前端删除 DAG,则重新加载 DAG(因为未删除文件)。如果您先删除文件并刷新页面,则无法再从 Web gui 中删除 DAG。 所以让我从前端删除 DAG 的操作顺序是:

    删除 DAG 文件(在我的情况下,从管道存储库中删除并部署到气流服务器,尤其是调度程序) 请勿刷新 Web GUI。 在 DAG 视图(普通首页)的 Web GUI 中,单击“删除 dag”-> 最右侧的红色图标。 它会从数据库中清除此 DAG 的所有剩余部分。

【讨论】:

你发现了这个序列。你救了我。【参考方案5】:

我刚刚写了一个脚本来删除与特定 dag 相关的所有内容,但这仅适用于 mysql。如果您使用的是 PostgreSQL,您可以编写不同的连接器方法。最初是 Lance 在https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 上发布的命令 我只是把它放在脚本里。希望这可以帮助。格式:python script.py dag_id

import sys
import MySQLdb

dag_input = sys.argv[1]

query = 'delete from xcom where dag_id = "' + dag_input + '"',
        'delete from task_instance where dag_id = "' + dag_input + '"',
        'delete from sla_miss where dag_id = "' + dag_input + '"',
        'delete from log where dag_id = "' + dag_input + '"',
        'delete from job where dag_id = "' + dag_input + '"',
        'delete from dag_run where dag_id = "' + dag_input + '"',
        'delete from dag where dag_id = "' + dag_input + '"' 

def connect(query):
        db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
        cur = db.cursor()
        cur.execute(query)
        db.commit()
        db.close()
        return

for value in query:
        print value
        connect(value)

【讨论】:

我注意到dag 表中有一个pickle_id。我们是否应该在从dag 表中删除之前也执行delete from dag_pickle where id = (select pickle_id from public.dag where dag_id = 'my_dag_id')【参考方案6】:

Airflow 1.10.1 已发布。此版本增加了在您从文件系统中删除相应 DAG 后从 Web UI 中删除 DAG 的功能。

查看此票以了解更多详情:

[AIRFLOW-2657] 添加从 web ui 中删除 DAG 的功能

请注意,这实际上并没有从文件系统中删除 DAG,您需要先手动执行此操作,否则 DAG 将被重新加载。

【讨论】:

如果您删除了实际的 DAG 文件,它可以工作。如果 DAG 仍然存在,它将被重新加载 这给了我Dag id example_bash_operator is still in DagBag. Remove the DAG file first. 您需要先从文件系统中删除 Dag 文件。 @Jaco 的评论很有帮助。从 dags 目录中删除 DAG .py 文件后,错误 Dag id example_bash_operator is still in DagBag. Remove the DAG file first. 消失。 这是一个非常有用的功能!但是我想删除一个 DAG,以便删除历史记录并立即重新添加。 Airflow 不接受具有相同文件名的 DAG。我不得不更改 DAG 的文件名,然后 Airflow 将其识别为新的 DAG(具有相同的名称和相同的参数)。【参考方案7】:

我编写了一个脚本,用于删除与默认 SQLite DB 的特定 dag 相关的所有元数据。这是基于耶稣上面的回答,但从 Postgres 改编为 SQLite。用户应将../airflow.db 设置为相对于默认airflow.db 文件(通常为~/airflow)存储script.py 的位置。要执行,请使用python script.py dag_id

import sqlite3
import sys

conn = sqlite3.connect('../airflow.db')
c = conn.cursor()

dag_input = sys.argv[1]

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    query = "delete from  where dag_id=''".format(t, dag_input)
    c.execute(query)

conn.commit()
conn.close()

【讨论】:

这行得通,至少在 PR 合并之前是一个很好的解决方案【参考方案8】:

对于直接访问airflow db的Postgrespsql控制台的用户,可以简单地执行以下请求来移除DAG:

\set dag_id YOUR_DAG_ID

delete from xcom where dag_id=:'dag_id';
delete from task_instance where dag_id=:'dag_id';
delete from sla_miss where dag_id=:'dag_id';
delete from log where dag_id=:'dag_id';
delete from job where dag_id=:'dag_id';
delete from dag_run where dag_id=:'dag_id';
delete from dag where dag_id=:'dag_id';

类似的(稍作改动)查询适用于其他数据库,例如 MySQL 和 SQLite。

【讨论】:

【参考方案9】:

Airflow 中没有任何内置功能可以为您做到这一点。要删除 DAG,请将其从存储库中删除并删除 Airflow Metastore 表 - dag 中的数据库条目。

【讨论】:

我还必须重新启动运行计划和网络服务器的机器才能完成清理工作。仅仅重启网络服务器和调度器是不够的。【参考方案10】:

您可以清除一组任务实例,就好像它们从未运行过一样:

airflow clear dag_id -s 2017-1-23 -e 2017-8-31

然后从 dags 文件夹中删除 dag 文件

【讨论】:

这可能会导致dag表中有一些未清理的数据 如果您将删除作为重置 dag 的一种方法,它还可能导致气流决定是时候重新运行 DAG【参考方案11】:

根据@OlegYamin 的回答,我正在执行以下操作来删除由postgres 支持的dag,其中airflow 使用public 架构。

delete from public.dag_pickle where id = (
    select pickle_id from public.dag where dag_id = 'my_dag_id'
);
delete from public.dag_run where dag_id = 'my_dag_id';
delete from public.dag_stats where dag_id = 'my_dag_id';
delete from public.log where dag_id = 'my_dag_id';
delete from public.sla_miss where dag_id = 'my_dag_id';
delete from public.task_fail where dag_id = 'my_dag_id';
delete from public.task_instance where dag_id = 'my_dag_id';
delete from public.xcom where dag_id = 'my_dag_id';
delete from public.dag where dag_id = 'my_dag_id';

警告:我不知道第一个删除查询的效果/正确性。这只是假设需要它。

【讨论】:

【参考方案12】:

只需从 mysql 中删除它,对我来说效果很好。从下表中删除它们:

dag

dag_constructor

dag_group_ship dag_pickle dag_run dag_stats

(未来版本中可能会有更多表格) 然后重启 webserver 和 worker。

【讨论】:

【参考方案13】:

版本 >= 1.10.0:

我有气流版本 1.10.2,我尝试执行气流 delete_dag 命令,但该命令引发以下错误:

bash-4.2# 气流 delete_dag dag_id

[2019-03-16 15:37:20,804] settings.py:174 INFO - settings.configure_orm():使用池设置。 pool_size=5,pool_recycle=1800,pid=28224 /usr/lib64/python2.7/site-packages/psycopg2/init.py:144:用户警告:psycopg2 ***包将从 2.8 版重命名;为了继续从二进制安装,请改用“pip install psycopg2-binary”。详情见:http://initd.org/psycopg/docs/install.html#binary-install-from-pypi。 """) 这将删除与指定 DAG 相关的所有现有记录。继续? (y/n)y 回溯(最近一次通话最后): 文件“/usr/bin/airflow”,第 32 行,在 args.func(args) 包装器中的文件“/usr/lib/python2.7/site-packages/airflow/utils/cli.py”,第 74 行 返回 f(*args, **kwargs) 文件“/usr/lib/python2.7/site-packages/airflow/bin/cli.py”,第 258 行,在 delete_dag 引发 AirflowException(错误) airflow.exceptions.AirflowException:服务器错误

虽然我可以通过 Curl 命令删除。 请让我知道是否有人知道此命令的执行,这是已知的还是我做错了什么。

版本

没有删除dag的命令,所以需要先删除dag文件,然后从airflow元数据库中删除所有对dag_id的引用。

警告

您可以重置气流元数据库,您将删除所有内容,包括 dag,但请记住,您还将删除历史记录、池、变量等。

airflow resetdb 然后airflow initdb

【讨论】:

是的,但你应该让人们知道运行 airflow resetdb 将清除数据库中的所有内容,包括任何 poolsvariables,甚至是登录会话 cookie 数据(意味着任何拥有登录会话 cookie 的人)当他们刷新页面时,在他们的浏览器上会收到一个Server Error,他们需要清除他们的 cookie/缓存或使用 Chrome 的隐身模式才能重新登录(这在生产环境中不好,因为它会使用户认为您的气流下降了...))。 另外你必须在运行airflow resetdb之后运行airflow initdb【参考方案14】:

第一 --> 从 $AIRFLOW_HOME/dags 文件夹中删除 DAG 文件。 注意:根据您是否使用过子目录,您可能需要翻遍子目录才能找到 DAG 文件并将其删除。

第二--> 使用删除按钮(圆圈中的 x)从 Web 服务器 UI 中删除 DAG

【讨论】:

【参考方案15】:

在新的气流版本中,UI 中有一个删除 dag(红色 x)按钮,位于 DAG 旁边

【讨论】:

您可以更具体一点,新的气流版本是什么?这种方式删除dag并不会删除文件,应该先做。 如果我没记错的话,我的版本是 1.8。显然,您需要从 dags 文件夹中物理删除 DAG 文件。这不是这里的问题,请更仔细地阅读问题:“如何删除正在运行并显示在 Web GUI 中的特定 DAG?...”这里的问题是如何从 GUI 中删除 DAG,因为它已被缓存那里。为此,您可以按照我上面的解释将其删除。【参考方案16】:

我遇到了鸡/蛋问题,我点击了正在运行的 DAG 实例的绿色小圆圈,它让您“标记为失败”等。我点击了“删除”,它不再卡住了。

【讨论】:

【参考方案17】:

如果您使用 Docker 运行 Airflow,您可以使用 DAG 中的 BashOperator 删除另一个 DAG:

t1 = BashOperator(task_id='delete_dag_task', bash_command=f'airflow dags delete -y dag_id')

dag_id 是 dag 的名称。这使用标准 CLI 命令,而不是自己从元数据库中删除记录。您还需要使用 PythonOperator 从 dags 目录中删除 DAG 文件。

我有这样一个 DAG:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash import BashOperator
import os

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = 
    'start_date': days_ago(1),
    'owner': 'airflow',
    'retries': 1



def delete_dag(**context):
    conf = context["dag_run"].conf
    dag_id = conf["dag_name"]
    t1 = BashOperator(task_id='delete_dag_task', bash_command=f'airflow dags delete -y dag_id')
    t1.execute(context=context)


def delete_dag_file(**context):
    conf = context["dag_run"].conf
    dag_id = conf["dag_name"]
    script_dir = os.path.dirname(__file__)
    dag_file_path = os.path.join(script_dir, '.py'.format(dag_id))
    try:
        os.remove(dag_file_path)
    except OSError:
        pass


with DAG('dag-deleter',
         schedule_interval=None,
         default_args=default_args,
         is_paused_upon_creation=False,
         catchup=False) as dag:

    delete_dag = PythonOperator(
        task_id="delete_dag",
        python_callable=delete_dag,
        provide_context=True)

    delete_dag_file = PythonOperator(
        task_id="delete_dag_file",
        python_callable=delete_dag_file,
        provide_context=True
    )

    delete_dag >> delete_dag_file

我使用 REST API 触发 DAG,在 http 请求中传递以下负载:

"conf": "dag_name": "my_dag_name" 

【讨论】:

【参考方案18】:

从 dags 文件夹中删除 dag(您要删除)并运行 airflow resetdb

或者,您可以进入 airflow_db 并手动从 dag 表中删除这些条目(task_fail、xcom、task_instance、sla_miss、log、job、dag_run、dag、dag_stats)。

【讨论】:

是的,但您应该让人们知道运行 airflow resetdb 将清除数据库中的所有内容,包括任何 poolsvariables,甚至是登录会话 cookie 数据(意味着任何拥有登录会话 cookie 的人)当他们刷新页面时,他们的浏览器上会出现服务器错误,他们需要清除他们的 cookie/缓存或使用 Chrome 的隐身模式才能重新登录(这在生产环境中不好,因为它使用户认为你的气流下降了......))。此外,您必须在运行 airflow resetdb 后运行 airflow initdb 不推荐。更新数据库数据首选upgradedb【参考方案19】:

对于那些仍在寻找答案的人。在 Airflow 版本 1.8 上,删除 DAG 非常困难,您可以参考上面的答案。但是自从 1.9 已经发布,你只需要

删除 dags 文件夹中的 dag 并重启 webserver

【讨论】:

请注意,resetdb 将烧毁并重建整个元数据数据库。无法以这种方式重置一个 DAG。 airflow.apache.org/cli.html#resetdb

以上是关于气流:如何删除 DAG?的主要内容,如果未能解决你的问题,请参考以下文章

如何防止气流回填 dag 运行?

如何与客户运营商验证气流 DAG?

如何防止气流回填dag运行?

气流 - 如何仅“填充 DagBag”一次

每月日期和时间的气流 DAG 调度

Apache Manged Airflow EMR 操作员 DAG 失败