如果任何任务失败,气流回填将停止

Posted

技术标签:

【中文标题】如果任何任务失败,气流回填将停止【英文标题】:Airflow backfill stops if any task fails 【发布时间】:2018-09-20 15:47:02 【问题描述】:

我正在使用气流 cli 的 backfill 命令手动运行一些回填作业。

 airflow backfill mydag -i -s 2018-01-11T16-00-00 -e 2018-01-31T23-00-00 --reset_dagruns --rerun_failed_tasks

dag 间隔是每小时一次,它有大约 40 个任务。因此,这种回填工作需要一天多的时间才能完成。我需要它在没有监督的情况下运行。但是,我注意到,即使一项任务在回填间隔中的一次运行中失败,整个回填作业也会停止并出现以下异常,我必须再次手动重新启动它。

    Traceback (most recent call last):
      File "/home/ubuntu/airflow/bin/airflow", line 4, in <module>
        __import__('pkg_resources').run_script('apache-airflow==1.10.0', 'airflow')
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/pkg_resources/__init__.py"
    , line 719, in run_script
        self.require(requires)[0].run_script(script_name, ns)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/pkg_resources/__init__.py", line 1504, in run_script
        exec(code, namespace, namespace)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.
    5.egg/EGG-INFO/scripts/airflow", line 32, in <module>
        args.func(args)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.5.egg/airflow/utils/cli.py", line 74, in wrapper
        return f(*args, **kwargs)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.
    5.egg/airflow/bin/cli.py", line 217, in backfill
        rerun_failed_tasks=args.rerun_failed_tasks,
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.5.egg/airflow/models.py", line 4105, in run
        job.run()
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.
    5.egg/airflow/jobs.py", line 202, in run
        self._execute()
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.5.egg/airflow/utils/db.py", line 74, in wrapper
        return func(*args, **kwargs)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.
    5.egg/airflow/jobs.py", line 2533, in _execute


 airflow.exceptions.AirflowException: 

Some task instances failed:
('mydag', 'a_task', datetime.datetime(2018, 1, 30, 17, 5, tzinfo=psy
copg2.tz.FixedOffsetTimezone(offset=0, name=None)))

任务实例不依赖于它们之前的实例,因此我不介意一两个任务失败。我需要这份工作才能继续。

我在documentation of backfill 中找不到任何允许我指定此行为的选项。

有没有办法实现我正在寻找的东西?

【问题讨论】:

【参考方案1】:

--donot_pickle 开关添加到backfill 命令may help。

【讨论】:

【参考方案2】:

在使用 backfill 命令时遇到了同样的问题。

尝试了 --donot_pickle 选项并将depends_on_past 设置为 False 没有成功。

可能的解决方法:为 DAG 设置开始日期和 catchup=True,然后在 Web gui 中取消暂停。这就像回填一样。

如果超过 1 个 DAG 运行被标记为失败,我无法让回填 CLI 命令继续运行。

【讨论】:

【参考方案3】:

如果我正确理解你的问题,你寻求的行为可以通过设置来实现

'depends_on_past': False

在 DAG 参数中。

来源:https://airflow.incubator.apache.org/tutorial.html#backfill

【讨论】:

【参考方案4】:

据我了解,当队列中的任务失败时,回填会停止执行。

对我有用的一个技巧是在队列中加载我需要运行的所有任务,而不管失败。也就是说,我将 max_active_runs 增加到一个荒谬的数字,以便执行所有 dag 运行。

例如 max_active_runs: 1000

检查 airflow documentation 关于 dag 的默认参数。

【讨论】:

以上是关于如果任何任务失败,气流回填将停止的主要内容,如果未能解决你的问题,请参考以下文章

气流回填澄清

如何防止气流回填 dag 运行?

气流回填不起作用

如何防止气流回填dag运行?

是否可以同时进行气流回填和调度?

一旦他分配的任务之一因任何原因失败,Java 就会停止执行程序服务