如何在 Luigi 中启用动态需求?
Posted
技术标签:
【中文标题】如何在 Luigi 中启用动态需求?【英文标题】:How to enable dynamic requirements in Luigi? 【发布时间】:2017-07-22 15:16:40 【问题描述】:我在 Luigi 中构建了一个任务管道。由于此管道将在不同的上下文中使用,因此可能需要在管道的开头或结尾包含更多任务,甚至任务之间的完全不同的依赖关系。
那时我想:“嘿,为什么要在我的配置文件中声明任务之间的依赖关系?”,所以我在我的 config.py 中添加了这样的内容:
PIPELINE_DEPENDENCIES =
"TaskA": [],
"TaskB": ["TaskA"],
"TaskC": ["TaskA"],
"TaskD": ["TaskB", "TaskC"]
我对在整个任务中堆积的参数感到恼火,所以在某些时候我只引入了一个参数task_config
,每个Task
都有,并且每个run()
所需的信息或数据都存储在其中.所以我把PIPELINE_DEPENDENCIES
放在那里。
最后,我将让我定义的每个 Task
都继承自 luigi.Task
和自定义 Mixin 类,这将实现动态 requires()
,如下所示:
class TaskRequirementsFromConfigMixin(object):
task_config = luigi.DictParameter()
def requires(self):
required_tasks = self.task_config["PIPELINE_DEPENDENCIES"]
requirements = [
self._get_task_cls_from_str(required_task)(task_config=self.task_config)
for required_task in required_tasks
]
return requirements
def _get_task_cls_from_str(self, cls_str):
...
不幸的是,这不起作用,因为运行管道会给我以下信息:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 were left pending, among these:
* 4 was not granted run permission by the scheduler:
- 1 TaskA(...)
- 1 TaskB(...)
- 1 TaskC(...)
- 1 TaskD(...)
Did not run any tasks
This progress looks :| because there were tasks that were not granted run permission by the scheduler
===== Luigi Execution Summary =====
还有很多
DEBUG: Not all parameter values are hashable so instance isn't coming from the cache
虽然我不确定这是否相关。
所以: 1.我的错误是什么?它可以修复吗? 2. 还有其他方法可以实现吗?
【问题讨论】:
DEBUG 日志是由于 DictParameter 不可哈希,与权限问题无关。 您没有在 requires 方法中实例化任务,这是故意的吗?它是在辅助方法中以某种方式完成的吗? 我实际上(据我所知),_get_task_cls_from_str
返回一个给定字符串的类,如PIPELINE_DEPENDENCIES
(所以"TaskA
类型为字符串将变成TaskA
type class),然后在requires
的列表推导中给出task_config
参数,从而变成TaskA
类型的对象。
【参考方案1】:
我意识到这是一个老问题,但我最近学会了如何启用动态依赖项。我能够通过使用 WrapperTask 并产生一个 dict 理解(尽管如果你愿意,你也可以做一个列表)和我想在 requires 方法中传递给其他任务的参数来实现这一点。
类似这样的:
class WrapperTaskToPopulateParameters(luigi.WrapperTask):
date = luigi.DateMinuteParameter(interval=30, default=datetime.datetime.today())
def requires(self):
base_params = ['string', 'string', 'string', 'string', 'string', 'string']
modded_params = modded_param:'mod' + base for base in base_params
yield list(SomeTask(param1=key_in_dict_we_created, param2=value_in_dict_we_created) for key_in_dict_we_created,value_in_dict_we_created in modded_params.items())
如果有兴趣,我也可以发布一个使用列表理解的示例。
【讨论】:
以上是关于如何在 Luigi 中启用动态需求?的主要内容,如果未能解决你的问题,请参考以下文章