Python Luigi - 满意时继续执行外部任务

Posted

技术标签:

【中文标题】Python Luigi - 满意时继续执行外部任务【英文标题】:Python Luigi - Continue with External task when satisfied 【发布时间】:2016-10-25 14:22:45 【问题描述】:

我正在使用 Luigi 管道检查是否存在手动创建的文件,如果存在,则继续执行下一个任务:

import luigi, os

class ExternalFileChecker(luigi.ExternalTask):
    task_namespace='MyTask'
    path = luigi.Parameter()
    def output(self):
        return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))

 class ProcessExternalFile(luigi.Task):
      task_namespace='MyTask'
      path = luigi.Parameter()

      def requires(self):
          return ExternalFileChecker(path=self.path)

      def output(self):
          dirname = self.path
          outfile = os.path.join(dirname, 'processedfile.txt')
          return luigi.LocalTarget(outfile)

      def run(self):
          #do processing

if __name__ == '__main__':
      path = r'D:\MyPath\luigi'
      luigi.run(['MyTask.ProcessExternalFile','--path', path,\
      '--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
      '--worker-keep-alive'])

我想要的是 luigi 在我创建手动文件并将其粘贴到路径后继续。当我这样做时,它不是查找文件并继续执行任务,而是每隔几秒钟重新检查一个新任务:

DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)

经过相当长的时间(15-20 分钟左右),luigi 会找到该文件,然后它可以按需要继续。我能做些什么来防止这种延迟?我希望 luigi 在文件存在后立即继续。

【问题讨论】:

【参考方案1】:

需要注意的几点:

    Luigi 工作线程只有在至少有一个任务正在运行时才会退出(或者如果 keep_alive = True,在这种情况下,它将在没有更多待处理任务时退出)。 失败任务有重试逻辑,默认重试间隔为 15 分钟。 重试逻辑的工作原理如下。在指定的重试间隔后,调度器将忘记任务的失败(与单击 UI 中的“原谅失败”按钮相同),并将任务的状态更改为待处理。下次工作人员向调度程序请求工作时,可以将此任务分配给工作人员。 未完成的外部任务算作失败,取决于重试逻辑。 外部任务的重试行为由[worker] 部分中的retry_external_tasks 配置设置控制。

我认为您所观察到的是这样的。您的管道正在运行,任务ProcessExternalFile 失败,然后添加文件,任务在retry_delay 的持续时间内保持失败状态,最后它变为 PENDING 并且工作人员再次获得此任务,此时它发现文件,任务就完成了。

这是否是所需的行为取决于您。如果您希望更快地找到文件,您可以更改重试间隔。或者您可以在run 方法中执行无限while 循环并定期检查文件,并在找到时退出循环。您还可以将 Luigi 配置为完全禁用重试逻辑。

【讨论】:

以上是关于Python Luigi - 满意时继续执行外部任务的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Python Luigi 登录

基于 Python 的异步工作流模块:celery 工作流和 luigi 工作流有啥区别?

luigi框架--关于python运行spark程序

python luigi localTarget 泡菜

如何在 Python Luigi 中使用参数

从 cmd 运行 Luigi 任务 - “没有名为任务的模块”