Luigi 任务方法执行顺序

Posted

技术标签:

【中文标题】Luigi 任务方法执行顺序【英文标题】:Luigi task methods execution order 【发布时间】:2017-05-13 11:49:39 【问题描述】:

Luigi 执行方法(运行、输出、要求)的顺序是什么。我知道需要运行作为检查任务 DAG 有效性的第一个检查,但不应该在 run() 之后运行输出?

我实际上是在尝试等待运行中的 kafka 消息,并基于该消息触发一堆其他任务并返回 LocalTarget。像这样:

def run(self):
    for message in self.consumer:
        self.metadata_key = str(message.value, 'utf-8')
        self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
        if not os.path.exists(self.path):
            os.mkdir(self.path)

        with self.conn.cursor() as cursor:
              all_accounts = cursor.execute('select domainname from tblaccountinfo;')
        for each in all_accounts:
            open(os.path.join(self.path,each)).close()

def output(self):
    return LocalTarget(self.path)

但是,我收到一条错误消息:

例外:必须设置路径或 is_tmp

return LocalTarget(self.path) 行。为什么 luigi 尝试执行 def output() 方法直到 def run() 完成?

【问题讨论】:

【参考方案1】:

当您运行管道(即一个或多个任务)时,Luigi 首先检查其输出目标是否已经存在,如果不存在,则安排任务运行。

Luigi 如何知道它必须检查哪些目标?它只是让他们调用你的任务的output() 方法。

【讨论】:

【参考方案2】:

这不是执行顺序。 Luigi 将在将任务设置为挂起状态之前检查我们要使用 output() 方法创建的文件是否存在。因此,如果您使用任何变量,它希望这些变量得到解决。在这里,您使用的是在 run 方法中创建的 self.path。这就是错误的原因。

您必须在类本身中创建路径并在输出方法中使用,或者在输出方法本身中创建它们并在运行方法中使用它们,如下所示

self.output().open('w').close()

【讨论】:

以上是关于Luigi 任务方法执行顺序的主要内容,如果未能解决你的问题,请参考以下文章

使用 ExecutorService 控制任务执行顺序

scala:如何在SBT设置中定义多个相关任务之间的执行顺序?

.net core多个重复任务执行顺序

如何控制多线程执行顺序

js执行顺序

一个线程执行多个任务,按照顺序执行