在luigi中使用multiprocessing.Queue

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在luigi中使用multiprocessing.Queue相关的知识,希望对你有一定的参考价值。

我在luigi有一组任务,都需要访问数据库。如果它们位于不同的端口上,我可以同时访问我的数据库最多8个任务(我有允许的端口列表)。我应该如何最好地实现这个似乎与工作人员数量的标准限制类似的限制,即对于我的情况,任务应该在工人空闲且数据库端口空闲时运行。

我尝试在multiprocessing.Queue()中创建一个__main__并将其传递给WrapperTask,它接收它作为luigi.Parameter(),但这会出错并挂起

UserWarning: Parameter "queue" with value <multiprocessing.queues.Queue object at 0x00000000149E4518>" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))

这个想法是,如果队列为空,.get()调用会挂起一个Task,并再次继续执行另一个任务.put(port)

这里出了什么问题?或者我采取完全错误的方法来管理luigi的资源?

答案

您应该使用Luigi配置中的“资源”部分。这将确保不超过此数量的工作人员共享全球资源。在这里找到更多https://luigi.readthedocs.io/en/stable/configuration.html#resources

以上是关于在luigi中使用multiprocessing.Queue的主要内容,如果未能解决你的问题,请参考以下文章

使用 Luigi 管道时组织文件?

如何使参数可用于所有 Luigi 任务?

如何在 Luigi 中启用动态需求?

如何从 Python Luigi 登录

在运行()中产生任务时Luigi中的TaskClassAmbigiousException

如何使用 Luigi 处理输出