实现 luigi 动态图配置
Posted
技术标签:
【中文标题】实现 luigi 动态图配置【英文标题】:Implementing luigi dynamic graph configuration 【发布时间】:2018-12-05 09:56:52 【问题描述】:我是 luigi 的新手,在为我们的 ML 工作设计管道时遇到了它。虽然它不适合我的特定用例,但它有很多额外的功能,我决定让它适合。
基本上我一直在寻找一种能够持久化自定义构建管道的方法,从而使其结果可重复且更易于部署,在阅读了大部分在线教程后,我尝试使用现有的 @987654323 实现我的序列化@ 配置和命令行机制,它可能足以满足任务的参数,但它无法序列化我的管道的 DAG 连接,所以我决定有一个 WrapperTask 接收 json config file
然后将创建所有任务实例并连接 luigi 任务的所有输入输出通道(完成所有管道)。
我特此附上一个小测试程序供您查阅:
import random
import luigi
import time
import os
class TaskNode(luigi.Task):
i = luigi.IntParameter() # node ID
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.required = []
def set_required(self, required=None):
self.required = required # set the dependencies
return self
def requires(self):
return self.required
def output(self):
return luigi.LocalTarget('01.txt'.format(self.__class__.__name__, self.i))
def run(self):
with self.output().open('w') as outfile:
outfile.write('inside 01\n'.format(self.__class__.__name__, self.i))
self.process()
def process(self):
raise NotImplementedError(self.__class__.__name__ + " must implement this method")
class FastNode(TaskNode):
def process(self):
time.sleep(1)
class SlowNode(TaskNode):
def process(self):
time.sleep(2)
# This WrapperTask builds all the nodes
class All(luigi.WrapperTask):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
num_nodes = 513
classes = TaskNode.__subclasses__()
self.nodes = []
for i in reversed(range(num_nodes)):
cls = random.choice(classes)
dependencies = random.sample(self.nodes, (num_nodes - i) // 35)
obj = cls(i=i)
if dependencies:
obj.set_required(required=dependencies)
else:
obj.set_required(required=None)
# delete existing output causing a build all
if obj.output().exists():
obj.output().remove()
self.nodes.append(obj)
def requires(self):
return self.nodes
if __name__ == '__main__':
luigi.run()
因此,基本上,正如问题标题中所述,这侧重于动态依赖关系并生成a 513 node dependency DAG
和p=1/35 connectivity probability
,它还将All(如make all)类定义为需要所有节点的WrapperTask为它被认为完成而构建(我有一个版本,它只将它连接到连接的 DAG 组件的头部,但我不想过于复杂)。
有没有更标准的(Luigic)方式来实现这一点?特别注意 TaskNode init 和 set_required 方法不太复杂,我这样做只是因为在 init 方法中接收参数与 luigi 注册参数的方式发生冲突。我还尝试了其他几种方法,但这基本上是最体面的一种(有效)
如果没有标准的方式,我仍然很想听听您对我在完成框架实施之前计划的方式的任何见解。
【问题讨论】:
相关未回答问题***.com/questions/42563175/… 你有没有考虑过 luigi 上的气流?我用它或多或少地完全按照您在问题中所说的:airflow.apache.org 我看了一眼,但它似乎更集中于 cmdline 而不是 python,这只是一个懒惰的第一印象吗? 我没必要这么说。它带有一个 Web 用户界面,并以您的 DAG 的计划或触发执行为中心。如果愿意,您可以从命令行手动运行 DAG。但是,它是一个非常大的包,其中有一些依赖项可能对您的情况来说太过分了。例如,它至少需要一个 sqlite 数据库。 嗯,听起来很有趣,有传言说它现在比 luigi 维护得更好,我可能会在下一轮(或我的下一份工作)看看它,但我有点全力以赴现在和路易吉在一起:/ 【参考方案1】:我昨天answered a similar question 做了一个演示。我几乎完全基于example in the docs.。在文档中,通过yeild
ing 任务分配动态依赖项似乎是他们喜欢的方式。
luigi.Config
和动态依赖关系可能会给你一个几乎无限灵活性的管道。他们还描述了一个虚拟的Task
,它调用多个依赖链here,,这可以给你更多的控制权。
【讨论】:
以上是关于实现 luigi 动态图配置的主要内容,如果未能解决你的问题,请参考以下文章
Springboot项目中运用vue+ElementUI+echarts前后端交互实现动态圆环图