Python 任务调度器 Luigi 可以检测到间接依赖吗?

Posted

技术标签:

【中文标题】Python 任务调度器 Luigi 可以检测到间接依赖吗?【英文标题】:Can Python task scheduler Luigi detect indirect dependencies? 【发布时间】:2017-07-16 21:12:09 【问题描述】:

短版:

在 Python 中是否有一个任务调度器可以做 gmake 所做的事情?特别是,我需要一个递归解决依赖关系的任务调度程序。我查看了 Luigi,但它似乎只解决了直接依赖关系。

加长版:

我正在尝试构建一个以预定义顺序处理大量数据文件的工作流,后面的任务可能直接依赖于一些早期任务的输出,但反过来,这些输出的正确性依赖于甚至更早的任务。

例如,让我们考虑如下依赖映射:

A

当我从任务 C 请求结果时,Luigi 会自动调度 B,然后由于 B 依赖于 A,它会调度 A。所以最终的运行顺序是 [A,B,C]。每个任务都会创建一个正式的输出文件作为成功执行的标志。这对于第一次运行来说很好。

现在,假设我在任务 A 的输入数据中犯了一个错误。显然,我需要重新运行整个链。但是,简单地从 A 中删除输出文件是行不通的。因为 Luigi 看到 B 和 C 的输出,得出结论任务 C 的要求已经满足,不需要运行。我必须从依赖于 A 的任务中删除 ALL 的输出文件,以便它们再次运行。在简单的情况下,我必须删除 A、B 和 C 中的所有输出文件,以便 Luigi 检测到对 A 所做的更改。

这是一个非常不方便的功能。如果我有数十或数百个任务相互之间具有相当复杂的依赖关系,那么当其中一项任务需要重新运行时,真的很难判断哪些任务会受到影响。对于任务调度程序并具有解决依赖关系的能力,我希望 Luigi 能够像 GNU-Make 一样行事,其中递归检查依赖关系,并且当最深的源文件之一发生更改时,将重建最终目标。

我想知道是否有人可以就这个问题提供一些建议。我是否缺少 Luigi 中的一些关键功能?是否有其他任务调度程序可以充当 gmake?我对基于 Python 的包特别感兴趣,并且更喜欢那些支持 Windows 的包。

非常感谢!

【问题讨论】:

【参考方案1】:

似乎可以通过覆盖您的任务的完整方法。您必须在依赖关系图中一直应用它。

def complete(self):
    outputs = self.flatten(self.output())
    if not all(map(lambda output: output.exists(), outputs)):
        return False
    for task in self.flatten(self.requires()):
        if not task.complete():
            for output in outputs:
                if output.exists():
                    output.remove()
            return False
    return True

【讨论】:

嗨@MattMcKnight,非常感谢您指出“完整”方法!我会试试这个解决方案。但是,我想知道为什么 Luigi 默认不这样做?使用 Python for 循环手动解决依赖关系可能效率不高,像我这样的普通用户可能会犯错误和/或做事欠佳。但无论如何,非常感谢! :) 上述代码需要进行多次更正才能正常工作。首先,方法flatten是Luigi定义的,所以需要调用为self.flatten。其次,我发现最好在最后重用默认的complete方法来检查任务本身的状态,所以我写了return super(MyTask, self).complete(),假设继承的类名为MyTask。但是,在运行下游任务时,我仍然收到“文件已存在”错误,因为它们的输出文件没有被删除。知道如何自动更新这些输出文件吗? 很抱歉没有尝试代码,只是想大致了解一种方法。我想除了从完整返回False 之外,还必须删除现有的输出。 Task.complete() 方法没有那么复杂。 github.com/spotify/luigi/blob/master/luigi/task.py#L526 有一些类似清理类型进程的请求。 github.com/spotify/luigi/issues/595#issuecomment-194323344 嗨@MattMcKnight,感谢您的更新!我发现 Luigi LocalTarget 有一个“删除”方法,可用于删除文件。因此,我将调用 output.remove(),而不是使用 os.remove 方法。我会接受这个答案,并发布我的最终测试版本。非常感谢您的帮助! :)【参考方案2】:

确实这很不方便,d6tflow 检查所有上游依赖项的完整性,而不仅仅是 TaskC 的输出。如果你重置TaskA,TaskC也会不完整,自动重新运行。

# reset TaskA => makes TaskC incomplete
TaskA().invalidate() 
d6tflow.preview(TaskC()) # all tasks pending

有关详细信息,请参阅下面的完整示例和d6tflow docs。

import d6tflow
import pandas as pd

class TaskA(d6tflow.tasks.TaskCachePandas):  # save dataframe in memory

    def run(self):        
        self.save(pd.DataFrame('a':range(10))) # quickly save dataframe

class TaskB(d6tflow.tasks.TaskCachePandas):

    def requires(self):
        return TaskA() # define dependency

    def run(self):
        df = self.input().load() # quickly load required data
        df = df*2
        self.save(df)

class TaskC(d6tflow.tasks.TaskCachePandas):

    def requires(self):
        return TaskB()

    def run(self):
        df = self.input().load() 
        df = df*2
        self.save(df)

# Check task dependencies and their execution status
d6tflow.preview(TaskC())
'''
└─--[TaskC- (PENDING)]
   └─--[TaskB- (PENDING)]
      └─--[TaskA- (PENDING)]
'''

# Execute the model training task including dependencies
d6tflow.run(TaskC())

'''
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 TaskC()
'''

# all tasks complete
d6tflow.preview(TaskC())

'''
└─--[TaskC- (COMPLETE)]
   └─--[TaskB- (COMPLETE)]
      └─--[TaskA- (COMPLETE)]
'''

# reset TaskA => makes TaskC incomplete
TaskA().invalidate() 
d6tflow.preview(TaskC())
'''
└─--[TaskC- (PENDING)]
   └─--[TaskB- (PENDING)]
      └─--[TaskA- (PENDING)]
'''

【讨论】:

您能否简要评论一下该库用于解决递归依赖关系的算法的效率?这个库可以在没有明显/明显延迟的情况下处理的任务池的典型大小是多少? 不确定,这在实践中从来都不是问题,我已经用它构建了大规模的机器学习系统。工作发生在任务内部,图书馆只是协调。一位用户训练了 20K ML 模型。

以上是关于Python 任务调度器 Luigi 可以检测到间接依赖吗?的主要内容,如果未能解决你的问题,请参考以下文章

Python Luigi - 满意时继续执行外部任务

在 Luigi 的任务之间传递 Python 对象?

用 luigi 任务替换表加载功能

从 cmd 运行 Luigi 任务 - “没有名为任务的模块”

当任务依赖关系过期时,luigi 可以重新运行任务吗?

如何使 Luigi 任务生成内存列表作为目标