Luigi 任务方法执行顺序
Posted
技术标签:
【中文标题】Luigi 任务方法执行顺序【英文标题】:Luigi task methods execution order 【发布时间】:2017-05-13 11:49:39 【问题描述】:Luigi 执行方法(运行、输出、要求)的顺序是什么。我知道需要运行作为检查任务 DAG 有效性的第一个检查,但不应该在 run() 之后运行输出?
我实际上是在尝试等待运行中的 kafka 消息,并基于该消息触发一堆其他任务并返回 LocalTarget。像这样:
def run(self):
for message in self.consumer:
self.metadata_key = str(message.value, 'utf-8')
self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
if not os.path.exists(self.path):
os.mkdir(self.path)
with self.conn.cursor() as cursor:
all_accounts = cursor.execute('select domainname from tblaccountinfo;')
for each in all_accounts:
open(os.path.join(self.path,each)).close()
def output(self):
return LocalTarget(self.path)
但是,我收到一条错误消息:
例外:必须设置路径或 is_tmp
在 return LocalTarget(self.path) 行。为什么 luigi 尝试执行 def output() 方法直到 def run() 完成?
【问题讨论】:
【参考方案1】:当您运行管道(即一个或多个任务)时,Luigi 首先检查其输出目标是否已经存在,如果不存在,则安排任务运行。
Luigi 如何知道它必须检查哪些目标?它只是让他们调用你的任务的output()
方法。
【讨论】:
【参考方案2】:这不是执行顺序。 Luigi 将在将任务设置为挂起状态之前检查我们要使用 output() 方法创建的文件是否存在。因此,如果您使用任何变量,它希望这些变量得到解决。在这里,您使用的是在 run 方法中创建的 self.path。这就是错误的原因。
您必须在类本身中创建路径并在输出方法中使用,或者在输出方法本身中创建它们并在运行方法中使用它们,如下所示
self.output().open('w').close()
【讨论】:
以上是关于Luigi 任务方法执行顺序的主要内容,如果未能解决你的问题,请参考以下文章