使动态 Luigi 任务的失败变得非关键

Posted

技术标签:

【中文标题】使动态 Luigi 任务的失败变得非关键【英文标题】:Make failure of a dynamic Luigi task non critical 【发布时间】:2018-06-17 10:31:18 【问题描述】:

我有一个 luigi 工作流程,它通过 ftp 下载一堆大文件并将它们存放在 s3 上。

我有一个任务读取要下载的文件列表,然后创建一堆实际执行下载的任务

这个想法是,这个工作流的结果是一个包含成功下载列表的单个文件,任何失败的下载都会在第二天的下一次运行中重新尝试。

问题在于,如果任何下载任务失败,则永远不会创建成功的下载列表。

这是因为动态创建的任务成为创建它们并从其输出编译列表的主任务的要求。

有没有办法使这些下载任务的失败无关紧要,以便编译列表减去失败任务的输出?

下面的示例代码,GetFiles 是我们从命令行调用的任务。

class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()

def run(self):
    with self.output().open('w') as output:
        WriteFileFromFtp(sourceUrl, output)

def output(self):
    client = S3Client()
    return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)


@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):

def run(self):

    with self.input().open('r') as fileList:
        files = json.load(fileList)

        tasks = []
        taskOutputs = []

        for file in files:
            task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
            tasks.append(task)
            taskOutputs.append(task.output())

        yield tasks

        successfulDownloads = MakeSuccessfulOutputList(taskOutputs)

    with self.output().open('w') as out:
        json.dump(successfulDownloads, out)

def output(self):
    client = S3Client()
    return S3Target(path='successfulDownloads.json', client=client)

【问题讨论】:

【参考方案1】:

几年后,您一定找到了答案,但这里有一些可以提供帮助的东西。

class DownloadFileFromFtp(luigi.Task):
      sourceUrl = luigi.Parameter()

      def run(self):
           with self.output().open('w') as output:
             WriteFileFromFtp(sourceUrl, output)
      
      def on_failure(self, exception):
          #If the task fails for any reason, 
          #then just indicate the task as completed.
          #From the docs, exception is a string, so you can easily.

          if "FileNotFound" in exception:
              return self.complete(ignore=True)
          return self.complete(ignore=False)

      def complete(self, ignore=False):
          return ignore

      def output(self):
          client = S3Client()
          return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

【讨论】:

【参考方案2】:

这个答案可能不正确 - 检查THE COMMENTS

我已经阅读了几次文档,但我没有发现诸如非关键故障之类的迹象。话虽如此,这种行为可以通过覆盖DownloadFileFromFtp 中的Task.complete 方法轻松实现,同时仍然能够在GetFiles.run 中使用DownloadFileFromFtp.output

通过return True覆盖,无论下载是否成功,任务DownloadFileFromFtp都会成功。

class DownloadFileFromFtp(luigi.Task):
    sourceUrl = luigi.Parameter()

    def run(self):
        with self.output().open('w') as output:
            WriteFileFromFtp(sourceUrl, output)

    def output(self):
        client = S3Client()
        return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

    def complete(self,):
        return True

但是请注意,您还可以在 complete 方法中使用更复杂的逻辑 - 例如,仅当任务在运行时遇到特定网络故障时才会失败。

【讨论】:

这将阻止任务运行,即使它从未运行过。当一个任务被调度时,luigi 首先通过在requires 中指定的每个依赖调用complete 来检查它的依赖是否完成。如果 complete 为依赖项返回 True,则不会运行该依赖项。 我看到实现此类可选依赖项的唯一方法是确保可选任务永远不会失败,通过在 run 方法中捕获所有异常(您可能仍然希望记录这些异常,或以某种方式在目标中表示失败,并且可能使用知道失败的目标,如果任务失败,则从其exists 方法返回False 我不知道,这种行为是否记录在某处?无论如何,这可能可以通过让run 方法始终将实例布尔值设置为True 来解决,然后complete 方法返回该值,指示run 方法是否已被调用.我稍后再试,如果是这样的话,我会编辑我的 awnser 设置一个内存变量来跟踪一个任务是否运行是可行的,直到你尝试使用多个 Luigi 工作者。由于工作人员不共享内存,因此他们每个人都会尝试运行任务。 Luigi 希望多个工作人员通过检查他们都可以访问的目标(例如文件或数据库)来进行协调。 @matmat 当然,这是有道理的。我编辑了答案以添加免责声明

以上是关于使动态 Luigi 任务的失败变得非关键的主要内容,如果未能解决你的问题,请参考以下文章

Luigi 没有接下一个要运行的任务,剩下一堆待处理的任务,没有失败的任务

如何使参数可用于所有 Luigi 任务?

如何使 Luigi 任务生成内存列表作为目标

Python Luigi中的事件处理

如何在 Luigi 中启用动态需求?

实现 luigi 动态图配置