在运行()中产生任务时Luigi中的TaskClassAmbigiousException

Posted

技术标签:

【中文标题】在运行()中产生任务时Luigi中的TaskClassAmbigiousException【英文标题】:TaskClassAmbigiousException in Luigi when yielding tasks in run() 【发布时间】:2021-04-24 11:26:09 【问题描述】:

我正在努力解决我不理解的 Luigi 错误。我不知道这是一个已知问题、Luigi 的限制还是我做错了什么。

我在一个涉及许多任务和许多依赖项的实际问题中使用 Luigi。但是,我做了一个玩具示例,其中清楚地显示了此问题。

让我们考虑两个任务,TaskA 和 TaskB,TaskA 需要执行两个先前具有不同 Luigi 参数值的 TaskB 实例。

如果我在 TaksA 的 requires() 方法中编写依赖关系,则不会发生任何不好的事情。所有三个任务都执行了,我得到了我的退出文件。

但如果我在 TaksA 的 run() 方法中编写依赖关系,那么我会得到丑陋的 TaskClassAmbigiousException。

在我的实际问题中,我无法在 requires() 方法中产生任务,因为我需要知道在 quieres() 方法中也产生的前一个任务的结果,所以我尝试在run() 得到了同样的异常。

好的,这是玩具示例的代码。首先,在 requieres() 中生成任务,它可以工作:

import luigi

class TaskB(luigi.Task):
    j = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputBj.txt".format(j=self.j))

    def requires(self):
        pass

    def run(self):
        print_file = 'TaskB' + str(self.j)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

class TaskA(luigi.Task):
    i = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputAi.txt".format(i=self.i))

    def requires(self):
        yield TaskB(j=self.i)
        yield TaskB(j=self.i+1)

    def run(self):
        print_file = ""
        for input_target in self.input():
            with input_target.open('r') as in_file:
                for line in in_file:
                    print_file+=line + 'TaskA' + str(self.i)

        with self.output().open('w') as out_file:
            out_file.write(print_file)
               
if __name__ == '__main__':
   taskA = TaskA(i=2)

其次,在 run() 中产生任务,我得到了这个:

 File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/worker.py", line 1081, in _handle_next_task
    for module, name, params in new_requirements]

  File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/worker.py", line 1081, in <listcomp>
    for module, name, params in new_requirements]

  File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/task_register.py", line 251, in load_task
    task_cls = Register.get_task_cls(task_name)

  File "/home/ppo0011l/.conda/envs/nudge/lib/python3.6/site-packages/luigi/task_register.py", line 181, in get_task_cls
    raise TaskClassAmbigiousException('Task %r is ambiguous' % name)

代码:

import luigi

class TaskB(luigi.Task):
    j = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputBj.txt".format(j=self.j))

    def requires(self):
        pass

    def run(self):
        print_file = 'TaskB' + str(self.j)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

class TaskA(luigi.Task):
    i = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputAi.txt".format(i=self.i))

    def requires(self):
        pass

    def run(self):
        print_file = ""
        target1 = yield TaskB(j=self.i)
        target2 = yield TaskB(j=self.i+1)
        for input_target in [target1, target2]:
            with input_target.open('r') as in_file:
                for line in in_file:
                    print_file+=line + 'TaskA' + str(self.i)

        with self.output().open('w') as out_file:
            out_file.write(print_file)
               
if __name__ == '__main__':
   taskA = TaskA(i=2)
   luigi.build([taskA], workers=1,local_scheduler=True,log_level='WARNING')

编辑:我编辑添加另一个相关问题。因为我想要做的是产生一个任务,其参数取决于先前产生的任务,如果这对我来说可能就足够了:

  def requires(self):
        taskb_target = yield TaskB(j=self.i)
        taskb_target.open('r')
# do something and yield next Task depending on what taskb_target has
        yield TaskB(j=self.i+1)

但不幸的是,这不起作用。 Luigi 说“NoneType”对象没有“open”属性。

但是,当您在 run() 方法中生成任务时,您可以在运行时访问输出。好像有很大的不对称……

第二次编辑:

我做了更多的试验,发现了一个奇怪的结论:我在原始问题中编写的第二段代码(在 .py 文件中时)可以永远执行,即使删除输出文件和所以迫使 luigi 重新执行任务。但是,第一段代码只能执行一次(然后,在第一次执行时,它可以工作!!)。但是如果你删除文件并再次执行代码,你会得到模棱两可的任务错误。

我认为这与 luigi 的 Register 对象有关。但真正让我感到困惑的是,无论我是在 requieres 还是在 run 方法中生成 taskB,这种行为都是不同的。

我仍然不知道在重新定义已经在 luigi 的注册模块中的类任务时是否会出现问题。可能是...我还尝试将类定义放在与主 .py 不同的 .py 中,但是在运行两次时它会中断。只有重启内核才能正常运行,你只有一次机会!

【问题讨论】:

【参考方案1】:

当你使用yield 时,你不会得到返回值,因为你基本上是在returning 来自协程的值。我真的很惊讶在requires 中使用yield 对你有用,因为它导致我的事情崩溃了。您要做的是首先定义然后产生任务。因此,例如,您将:

class TaskA(luigi.Task):
    def run(self):
        task_1 = TaskB(j=self.i)
        yield task_1
        with task_1.output().open('r') as in_file:
            # Get data

        task_2 = TaskB(j=self.i+1, ...)
        yield task_2
        ...

【讨论】:

谢谢,但这并不能解决问题。问题依然存在。我还发现生成任务解析为目标的方式很奇怪,但它在文档中:luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies 无论如何,我做了这个改变,但它仍然不起作用。而且我发现了更糟糕的事情:如果我更改 run 方法以便只产生一个 TaskB 实例,那么在我第一次运行我的代码时,它就可以工作。但是,如果我删除输出文件并再次运行它,它会失败,并出现与原始消息相同的错误。这与 luigi 的 Register 对象有关... 你能确认task_1的输出吗?它确实是在第 5 行导入的? 是的,可以读取输出。我找到了可能的原因,我会回答这个问题,虽然在我看来这是一种错误......【参考方案2】:

好的,现在我发现只有在执行两次脚本时才会出现问题。又试了一个错误,发现问题是,再次导入TaskA和TaskB时,又在luigi.task_register.Register注册了。

其实Register有一个属性_reg,里面包含了所有注册的类。并且在模块的第二次执行中,TaskA 和 TaskB 再次注册。我不知道为什么。这很奇怪,但却是真实的。而且只有在导入TaskA时才会出现,更诡异。

所以我发现解决这个问题的方法如下:

import luigi
from luigi import task_register

class TaskB(luigi.Task):
    j = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputBj.txt".format(j=self.j))

    def requires(self):
        pass

    def run(self):
        print_file = 'TaskB' + str(self.j)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

class TaskA(luigi.Task):
    i = luigi.IntParameter(default=1)

    def output(self):
        return luigi.LocalTarget("data/outputAi.txt".format(i=self.i))

    def requires(self):
        pass

    def run(self):
        print_file = ""
        target1 = yield TaskB(j=self.i)
        target2 = yield TaskB(j=self.i+1)
        for input_target in [target1, target2]:
            with input_target.open('r') as in_file:
                for line in in_file:
                    print_file+=line + 'TaskA' + str(self.i)

        with self.output().open('w') as out_file:
            out_file.write(print_file)

taskA_list = [c for c in task_register.Register._reg if c.__name__ == 'TaskA']
if len(taskA_list) > 1:
   task_register.Register._reg.pop()
   task_register.Register._reg.pop()
               
if __name__ == '__main__':
   taskA = TaskA(i=2)
   luigi.build([taskA], workers=1,local_scheduler=True,log_level='WARNING')

这是一个有效的技巧,可以使模块永远可重新执行,无论是否删除输出文件。但显然不是最优雅的解决方案。我写它只是为了帮助 luigi 开发人员修复它,或者如果我做错了什么来纠正我!

【讨论】:

以上是关于在运行()中产生任务时Luigi中的TaskClassAmbigiousException的主要内容,如果未能解决你的问题,请参考以下文章

当任务依赖关系过期时,luigi 可以重新运行任务吗?

如何使参数可用于所有 Luigi 任务?

如何避免在 Luigi 中与多个工作人员同时运行特定任务

Luigi 没有接下一个要运行的任务,剩下一堆待处理的任务,没有失败的任务

让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?

嵌入式代码中产生bug的几大原因