如何动态创建 Luigi 任务

Posted

技术标签:

【中文标题】如何动态创建 Luigi 任务【英文标题】:How to Dynamically create a Luigi Task 【发布时间】:2017-01-06 03:07:06 【问题描述】:

我正在为 Luigi Tasks 构建一个包装器,但遇到了 Register 类的障碍,该类实际上是一个 ABC 元类,并且在我创建动态 type 时无法选择。

以下代码或多或少是我用来开发动态类的代码。

class TaskWrapper(object):
    '''Luigi Spark Factory from the provided JobClass

    Args:
        JobClass(ScrubbedClass): The job to wrap
        options: Options as passed into the JobClass
    '''

    def __new__(self, JobClass, **options):
        # Validate we have a good job
        valid_classes = (
            ScrubbedClass01,
            # ScrubbedClass02,
            # ScrubbedClass03,
        )
        if any(vc == JobClass for vc in valid_classes) or not issubclass(JobClass, valid_classes):
            raise TypeError('Job is not the correct class: '.format(JobClass))

        # Build a luigi task class dynamically
        luigi_identifier = 'Task'
        job_name = JobClass.__name__
        job_name = job_name.replace('Pail', '')
        if not job_name.endswith(luigi_identifier):
            job_name += luigi_identifier

        LuigiTask = type(job_name, (PySparkTask, ), )

        for k, v in options.items():
            setattr(LuigiTask, k, luigi.Parameter())

        def main(self, sc, *args):
            job = JobClass(**options)
            return job._run()

        LuigiTask.main = main

        return LuigiTask

但是,当我运行调用函数时,我得到了PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed

调用函数:

def create_task(JobClass, **options):
    LuigiTask = TaskWrapper(JobClass, **options)
    # Add parameters
    parameters = 
        d: options.get(d)
        for d in dir(LuigiTask)
        if not d.startswith('_')
        if isinstance(getattr(LuigiTask, d), luigi.Parameter)
        if d in options
    

    task = LuigiTask(**parameters)
    return task

【问题讨论】:

不确定这是否有帮助,但我面临同样的问题并且已经能够通过使用 exec 定义类来解决它,那么我不会遇到这个问题。 【参考方案1】:

当使用ABC 的元类动态创建类时,模块变为abc,当工作人员试图找到任务时,它会转到抽象基类模块并尝试在那里找到它,但是当然不存在。

要解决这个问题,请确保 luigi 知道在哪里可以找到构建类的代码,方法是手动重置 __module__ 变量。

将行改为:

LuigiTask = type(job_name, (PySparkTask, ), '__module__':__name__)

据我所知,这只是 Windows 上的问题。

【讨论】:

以上是关于如何动态创建 Luigi 任务的主要内容,如果未能解决你的问题,请参考以下文章

如何使 Luigi 任务生成内存列表作为目标

创建 Luigi 任务图时出现 JSON 序列化错误

实现 luigi 动态图配置

如何创建动态计划任务?

Python Luigi - 满意时继续执行外部任务

Springboot定时任务原理及如何动态创建定时任务