如何动态创建 Luigi 任务
Posted
技术标签:
【中文标题】如何动态创建 Luigi 任务【英文标题】:How to Dynamically create a Luigi Task 【发布时间】:2017-01-06 03:07:06 【问题描述】:我正在为 Luigi Tasks 构建一个包装器,但遇到了 Register
类的障碍,该类实际上是一个 ABC 元类,并且在我创建动态 type
时无法选择。
以下代码或多或少是我用来开发动态类的代码。
class TaskWrapper(object):
'''Luigi Spark Factory from the provided JobClass
Args:
JobClass(ScrubbedClass): The job to wrap
options: Options as passed into the JobClass
'''
def __new__(self, JobClass, **options):
# Validate we have a good job
valid_classes = (
ScrubbedClass01,
# ScrubbedClass02,
# ScrubbedClass03,
)
if any(vc == JobClass for vc in valid_classes) or not issubclass(JobClass, valid_classes):
raise TypeError('Job is not the correct class: '.format(JobClass))
# Build a luigi task class dynamically
luigi_identifier = 'Task'
job_name = JobClass.__name__
job_name = job_name.replace('Pail', '')
if not job_name.endswith(luigi_identifier):
job_name += luigi_identifier
LuigiTask = type(job_name, (PySparkTask, ), )
for k, v in options.items():
setattr(LuigiTask, k, luigi.Parameter())
def main(self, sc, *args):
job = JobClass(**options)
return job._run()
LuigiTask.main = main
return LuigiTask
但是,当我运行调用函数时,我得到了PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed
。
调用函数:
def create_task(JobClass, **options):
LuigiTask = TaskWrapper(JobClass, **options)
# Add parameters
parameters =
d: options.get(d)
for d in dir(LuigiTask)
if not d.startswith('_')
if isinstance(getattr(LuigiTask, d), luigi.Parameter)
if d in options
task = LuigiTask(**parameters)
return task
【问题讨论】:
不确定这是否有帮助,但我面临同样的问题并且已经能够通过使用 exec 定义类来解决它,那么我不会遇到这个问题。 【参考方案1】:当使用ABC
的元类动态创建类时,模块变为abc
,当工作人员试图找到任务时,它会转到抽象基类模块并尝试在那里找到它,但是当然不存在。
要解决这个问题,请确保 luigi 知道在哪里可以找到构建类的代码,方法是手动重置 __module__
变量。
将行改为:
LuigiTask = type(job_name, (PySparkTask, ), '__module__':__name__)
据我所知,这只是 Windows 上的问题。
【讨论】:
以上是关于如何动态创建 Luigi 任务的主要内容,如果未能解决你的问题,请参考以下文章