如何实现多处理 python 装饰器?

Posted

技术标签:

【中文标题】如何实现多处理 python 装饰器?【英文标题】:How to implement a multiprocessing python decorator? 【发布时间】:2021-12-28 07:35:23 【问题描述】:

我想编写一个包装器,用于在 asyncio 中调用 CPU 要求高的函数。

我希望它像这样使用:

@cpu_bound
def fact(x: int):
    res: int = 1
    while x != 1:
        res *= x
        x -= 1
    return res

async def foo(x: int):
    res = await fact(x)
    ...

起初,我写道:

def cpu_bound(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
    @functools.wraps(func)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        executor =  get_executor() # This is a part where I implemented myself.
        return await loop.run_in_executor(
            executor, functools.partial(func, *args, **kwargs)
        )

    return wrapper

但是,我遇到了酸洗问题。

Traceback(最近一次调用最后一次):文件 "C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\multiprocessing\queues.py", 第 245 行,在 _feed 中 obj = _ForkingPickler.dumps(obj) 文件“C:\Users\Lenovo\AppData\Local\Programs\Python\Python39\lib\multiprocessing\reduction.py”, 第 51 行,在转储中 cls(buf, 协议).dump(obj) _pickle.PicklingError: Can't pickle : it's not the same object as ma​​in.fact

也许原始函数和包装函数不具有相同的id 是问题所在?

那么,有没有办法编写这样的包装器?

我知道我可以使用loop.run_in_executor,但是拥有这样的包装器会很有帮助。

【问题讨论】:

也许你必须像正常功能一样运行它res = await cpu_bound(fact)(x) 【参考方案1】:

也许原始函数和包装的函数没有相同的 id 是问题所在?

在某种程度上,是的。在将函数发送到目标进程之前,它会被腌制,这在您的情况下会失败,因为在装饰器重新绑定后,装饰器范围内的 func 对象与主模块中的 fact 对象不同。 查看this 和this 的问题了解一些背景知识。

根据这些答案,我创建了一个示例,说明如何实现您想要的。 诀窍是创建一个可挑选的“运行器”函数,目标进程可以使用它从某种注册表(例如字典...)中查找您的原始函数并运行它。这当然只是一个例子。您可能不想在装饰器中创建 ProcessPoolExecutor

import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools

original_functions=

def func_runner(name, *args):
    return original_functions[name](*args)

def cpu_bound(func):
    original_functions[func.__name__]=func
    @functools.wraps(func)
    async def wrapper(*args):
        with ProcessPoolExecutor(1) as pool:
            res =  await asyncio.get_running_loop().run_in_executor(
            pool, functools.partial(func_runner, func.__name__, *args)
        )
        return res

    return wrapper

@cpu_bound
def fact(arg):
    return arg

async def foo():
    res = await fact("ok")
    print(res)

if __name__ == "__main__":
    asyncio.run(foo())

【讨论】:

它就像一个魅力!谢谢!

以上是关于如何实现多处理 python 装饰器?的主要内容,如果未能解决你的问题,请参考以下文章

为什么进程池封装在装饰器中不能生效,而多进程可以

Python,如何添加另一个装饰器来过滤现有多装饰器的输出与python中的属性?

使用装饰器作为类来装饰 Python 类

python装饰器

python的wrapt模块实现装饰器

Python如何实现单例模式