当任务依赖关系过期时,luigi 可以重新运行任务吗?
Posted
技术标签:
【中文标题】当任务依赖关系过期时,luigi 可以重新运行任务吗?【英文标题】:Can luigi rerun tasks when the task dependencies become out of date? 【发布时间】:2015-05-01 20:42:31 【问题描述】:据我所知,luigi.Target
可以存在,也可以不存在。
因此,如果存在luigi.Target
,则不会重新计算。
我正在寻找一种方法来强制重新计算任务,如果它的一个依赖项被修改,或者如果其中一个任务的代码发生变化。
【问题讨论】:
【参考方案1】:实现目标的一种方法是重写 complete(...)
方法。
The documentation for complete
is straightforward.
只需实现一个函数来检查您的约束,如果您想重新计算任务,则返回 False
。
例如,要在更新依赖项时强制重新计算,您可以这样做:
def complete(self):
"""Flag this task as incomplete if any requirement is incomplete or has been updated more recently than this task"""
import os
import time
def mtime(path):
return time.ctime(os.path.getmtime(path))
# assuming 1 output
if not os.path.exists(self.output().path):
return False
self_mtime = mtime(self.output().path)
# the below assumes a list of requirements, each with a list of outputs. YMMV
for el in self.requires():
if not el.complete():
return False
for output in el.output():
if mtime(output.path) > self_mtime:
return False
return True
当任何需求不完整或任何需求的修改时间比当前任务更新或当前任务的输出不存在时,这将返回 False
。
检测代码何时更改更难。您可以使用类似的方案(检查mtime
),但除非每个任务都有自己的文件,否则它会成败。
由于能够覆盖complete
,因此可以实现您想要重新计算的任何逻辑。如果您希望为许多任务使用特定的 complete
方法,我建议将 luigi.Task
子类化,在那里实现您的自定义 complete
,然后从子类继承您的任务。
【讨论】:
第一次执行任务后,complete() 将运行,结果为 True。如果依赖项发生变化,luigi 怎么知道要重新运行 complete()? @selotape complete() 每次调用任务时都会调用。这就是为什么文档建议不要在其中进行任何实际工作并且它是确定性的。为确保每当任务依赖任务发生更改时调用它,修改它们的 complete() 以检查依赖关系树是必要的。 我认为您应该使用output.exists()
之类的东西而不是手动检查路径,因为目标类可能带有exists
的自定义定义。【参考方案2】:
我来晚了,但这里有一个 mixin,它改进了接受的答案以支持多个输入/输出文件。
class MTimeMixin:
"""
Mixin that flags a task as incomplete if any requirement
is incomplete or has been updated more recently than this task
This is based on http://***.com/a/29304506, but extends
it to support multiple input / output dependencies.
"""
def complete(self):
def to_list(obj):
if type(obj) in (type(()), type([])):
return obj
else:
return [obj]
def mtime(path):
return time.ctime(os.path.getmtime(path))
if not all(os.path.exists(out.path) for out in to_list(self.output())):
return False
self_mtime = min(mtime(out.path) for out in to_list(self.output()))
# the below assumes a list of requirements, each with a list of outputs. YMMV
for el in to_list(self.requires()):
if not el.complete():
return False
for output in to_list(el.output()):
if mtime(output.path) > self_mtime:
return False
return True
要使用它,您只需使用例如class MyTask(Mixin, luigi.Task)
声明您的类。
【讨论】:
我喜欢你的 mixin,但 Luigi 抱怨目标文件已经存在。评论不允许我有足够的空间来发布示例,所以我会将其作为非答案答案放在下面。【参考方案3】:上面的代码对我很有效,除了我相信正确的时间戳比较mtime(path)
必须返回一个浮点数而不是一个字符串(“Sat”>“Mon”...[原文如此])。因此,简单地说,
def mtime(path):
return os.path.getmtime(path)
代替:
def mtime(path):
return time.ctime(os.path.getmtime(path))
【讨论】:
【参考方案4】:关于 Shilad Sen 在下面发布的 Mixin 建议,请考虑以下示例:
# Filename: run_luigi.py
import luigi
from MTimeMixin import MTimeMixin
class PrintNumbers(luigi.Task):
def requires(self):
wreturn []
def output(self):
return luigi.LocalTarget("numbers_up_to_10.txt")
def run(self):
with self.output().open('w') as f:
for i in range(1, 11):
f.write("\n".format(i))
class SquaredNumbers(MTimeMixin, luigi.Task):
def requires(self):
return [PrintNumbers()]
def output(self):
return luigi.LocalTarget("squares.txt")
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write(":\n".format(n, out))
if __name__ == '__main__':
luigi.run()
MTimeMixin 在上面的帖子中。我使用一次运行任务
luigi --module run_luigi SquaredNumbers
然后我触摸文件 numbers_up_to_10.txt 并再次运行任务。然后 Luigi 提出以下抱怨:
File "c:\winpython-64bit-3.4.4.6qt5\python-3.4.4.amd64\lib\site-packages\luigi-2.7.1-py3.4.egg\luigi\local_target.py", line 40, in move_to_final_destination
os.rename(self.tmp_path, self.path)
FileExistsError: [WinError 183] Cannot create a file when that file already exists: 'squares.txt-luigi-tmp-5391104487' -> 'squares.txt'
这可能只是一个 Windows 问题,而不是 Linux 上的问题,“mv a b”可能只是删除旧的 b,如果它已经存在并且没有写保护。我们可以通过 Luigi/local_target.py 的以下补丁来解决这个问题:
def move_to_final_destination(self):
if os.path.exists(self.path):
os.rename(self.path, self.path + time.strftime("_%Y%m%d%H%M%S.txt"))
os.rename(self.tmp_path, self.path)
为了完整起见,这里再次将 Mixin 作为一个单独的文件,来自另一篇文章:
import os
class MTimeMixin:
"""
Mixin that flags a task as incomplete if any requirement
is incomplete or has been updated more recently than this task
This is based on http://***.com/a/29304506, but extends
it to support multiple input / output dependencies.
"""
def complete(self):
def to_list(obj):
if type(obj) in (type(()), type([])):
return obj
else:
return [obj]
def mtime(path):
return os.path.getmtime(path)
if not all(os.path.exists(out.path) for out in to_list(self.output())):
return False
self_mtime = min(mtime(out.path) for out in to_list(self.output()))
# the below assumes a list of requirements, each with a list of outputs. YMMV
for el in to_list(self.requires()):
if not el.complete():
return False
for output in to_list(el.output()):
if mtime(output.path) > self_mtime:
return False
return True
【讨论】:
感谢您的建议!我认为您对 Windows 问题的看法是正确的。我经常使用它,没有文件破坏问题。对于它的价值,这种整体方法让我有点不安。我总是觉得我在扭曲 Luigi,它不打算被使用。以上是关于当任务依赖关系过期时,luigi 可以重新运行任务吗?的主要内容,如果未能解决你的问题,请参考以下文章
在运行()中产生任务时Luigi中的TaskClassAmbigiousException
让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?