在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败

Posted

技术标签:

【中文标题】在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败【英文标题】:Pickling Python Function fails with ProcessPoolExecutor in a decorator 【发布时间】:2021-03-21 15:05:08 【问题描述】:

所以我问了这个问题并尝试了 ProcessPoolExecutor 方法。我使用装饰器建议的方式如下:

Running Image Manipulation in run_in_executor. Adapting to multiprocessing

import asyncio
import functools
from concurrent import futures

from app.exceptions.errors import ManipulationError


_pool = futures.ProcessPoolExecutor()


def executor(function):
    @functools.wraps(function)
    def decorator(*args, **kwargs):
        try:
            partial = functools.partial(function, *args, **kwargs)
            loop = asyncio.get_event_loop()
            return loop.run_in_executor(_pool, partial)
        except Exception as e:
            raise ManipulationError(str(e))

    return decorator

然后我将它用于如下功能:

@executor
@pil
def blur(image):
    frame = image.convert("RGBA")
    return frame.filter(ImageFilter.BLUR)

注意@pil 是我制作的另一个装饰器。

def pil(function):
    @functools.wraps(function)
    def wrapper(image, *args, **kwargs) -> BytesIO:
        img = PILManip.pil_image(image)
        if img.format == "GIF":
            frames = []
            for frame in ImageSequence.Iterator(img):
                res_frame = function(frame, *args, **kwargs)
                frames.append(res_frame)
            return PILManip.pil_gif_save(frames), "gif"
        elif img.format in ["PNG", "JPEG"]:
            img = function(img, *args, **kwargs)
            return PILManip.pil_image_save(img), "png"
        else:
            raise BadImage("Bad Format")

    return wrapper

我在 FastApi 路由中这样调用它:


@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
    byt = await Client.image_bytes(url)
    img, image_format = await blur(byt)
    return Response(img.read(), media_type=f"image/image_format")

我收到一些关于酸洗的错误。

500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 391, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
    await super().__call__(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc from None
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
    response = await self.dispatch_func(request, self.call_next)
  File "/home/codespace/workspace/dagpi-image/app/middleware/timer.py", line 8, in add_process_time_header
    response = await call_next(request)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
    task.result()
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
    await self.app(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
    response = await self.dispatch_func(request, self.call_next)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 56, in dispatch
    raise e from None
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 52, in dispatch
    response = await call_next(request)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
    task.result()
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
    await self.app(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc from None
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
    await self.app(scope, receive, send)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
    response = await func(request)
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
    raw_response = await run_endpoint_function(
  File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
    return await dependant.call(**values)
  File "/home/codespace/workspace/dagpi-image/app/routes/image_routes.py", line 107, in blur_image
    img, image_format = await blur(byt)
  File "/opt/python/3.8.6/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/python/3.8.6/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function blur at 0x7f87524091f0>: it's not the same object as app.image.pil_manipulation.blur

有人知道为什么会这样吗? 我被告知对象必须是可序列化的,我相信 BytesIO 是图像的唯一输入/输出。那应该是可序列化的。

【问题讨论】:

【参考方案1】:

装饰器通常会生成不易腌制(序列化)的包装函数,因为它们包含隐藏状态。在处理多处理时,您应该避免使用装饰器并将普通的全局函数发送到run_in_executor。例如,您可以将 executor 装饰器重写为实用函数:

_pool = concurrent.futures.ProcessPoolExecutor()
 
async def exec_async(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(_pool, fn, *args)

您可以使用await exec_async(some_function, arg1, arg2, ...) 等待它,而不是使用executor 来装饰函数。同样,您可以将 pil 装饰器重写为另一个实用程序:

def pil(image, transform):
    img = PILManip.pil_image(image)
    if img.format == "GIF":
        frames = []
        for frame in ImageSequence.Iterator(img):
            res_frame = transform(frame)
            frames.append(res_frame)
        return PILManip.pil_gif_save(frames), "gif"
    elif img.format in ["PNG", "JPEG"]:
        img = transform(img)
        return PILManip.pil_image_save(img), "png"
    else:
        raise BadImage("Bad Format")

blur 的实现现在变成了一个普通函数,调用 pil,并且可以安全地传递给exec_async


def blur(image):
    def transform(frame):
        frame = frame.convert("RGBA")
        return frame.filter(ImageFilter.BLUR)
    return pil(image, transform)
 
@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
    byt = await Client.image_bytes(url)
    img, image_format = await exec_async(blur, byt)
    return Response(img.read(), media_type=f"image/image_format")

注意:以上代码未经测试。

【讨论】:

以上是关于在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败的主要内容,如果未能解决你的问题,请参考以下文章

在装饰器中使用自变量

在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败

如何在 Python 的方法装饰器中使用类属性?

Wraps 在装饰器中使用时会产生 TypeError

Django:在其他装饰器中重用 login_required 装饰器

无法在装饰器中捕获 pytest 的结果