在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败
Posted
技术标签:
【中文标题】在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败【英文标题】:Pickling Python Function fails with ProcessPoolExecutor in a decorator 【发布时间】:2021-03-21 15:05:08 【问题描述】:所以我问了这个问题并尝试了 ProcessPoolExecutor 方法。我使用装饰器建议的方式如下:
Running Image Manipulation in run_in_executor. Adapting to multiprocessing
import asyncio
import functools
from concurrent import futures
from app.exceptions.errors import ManipulationError
_pool = futures.ProcessPoolExecutor()
def executor(function):
@functools.wraps(function)
def decorator(*args, **kwargs):
try:
partial = functools.partial(function, *args, **kwargs)
loop = asyncio.get_event_loop()
return loop.run_in_executor(_pool, partial)
except Exception as e:
raise ManipulationError(str(e))
return decorator
然后我将它用于如下功能:
@executor
@pil
def blur(image):
frame = image.convert("RGBA")
return frame.filter(ImageFilter.BLUR)
注意@pil
是我制作的另一个装饰器。
def pil(function):
@functools.wraps(function)
def wrapper(image, *args, **kwargs) -> BytesIO:
img = PILManip.pil_image(image)
if img.format == "GIF":
frames = []
for frame in ImageSequence.Iterator(img):
res_frame = function(frame, *args, **kwargs)
frames.append(res_frame)
return PILManip.pil_gif_save(frames), "gif"
elif img.format in ["PNG", "JPEG"]:
img = function(img, *args, **kwargs)
return PILManip.pil_image_save(img), "png"
else:
raise BadImage("Bad Format")
return wrapper
我在 FastApi 路由中这样调用它:
@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
byt = await Client.image_bytes(url)
img, image_format = await blur(byt)
return Response(img.read(), media_type=f"image/image_format")
我收到一些关于酸洗的错误。
500 Internal Server Error
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 391, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
await super().__call__(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
response = await self.dispatch_func(request, self.call_next)
File "/home/codespace/workspace/dagpi-image/app/middleware/timer.py", line 8, in add_process_time_header
response = await call_next(request)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
task.result()
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 25, in __call__
response = await self.dispatch_func(request, self.call_next)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 56, in dispatch
raise e from None
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette_prometheus/middleware.py", line 52, in dispatch
response = await call_next(request)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 45, in call_next
task.result()
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/middleware/base.py", line 38, in coro
await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
await route.handle(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
await self.app(scope, receive, send)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
response = await func(request)
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
raw_response = await run_endpoint_function(
File "/home/codespace/workspace/dagpi-image/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
return await dependant.call(**values)
File "/home/codespace/workspace/dagpi-image/app/routes/image_routes.py", line 107, in blur_image
img, image_format = await blur(byt)
File "/opt/python/3.8.6/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "/opt/python/3.8.6/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function blur at 0x7f87524091f0>: it's not the same object as app.image.pil_manipulation.blur
有人知道为什么会这样吗? 我被告知对象必须是可序列化的,我相信 BytesIO 是图像的唯一输入/输出。那应该是可序列化的。
【问题讨论】:
【参考方案1】:装饰器通常会生成不易腌制(序列化)的包装函数,因为它们包含隐藏状态。在处理多处理时,您应该避免使用装饰器并将普通的全局函数发送到run_in_executor
。例如,您可以将 executor
装饰器重写为实用函数:
_pool = concurrent.futures.ProcessPoolExecutor()
async def exec_async(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_pool, fn, *args)
您可以使用await exec_async(some_function, arg1, arg2, ...)
等待它,而不是使用executor
来装饰函数。同样,您可以将 pil
装饰器重写为另一个实用程序:
def pil(image, transform):
img = PILManip.pil_image(image)
if img.format == "GIF":
frames = []
for frame in ImageSequence.Iterator(img):
res_frame = transform(frame)
frames.append(res_frame)
return PILManip.pil_gif_save(frames), "gif"
elif img.format in ["PNG", "JPEG"]:
img = transform(img)
return PILManip.pil_image_save(img), "png"
else:
raise BadImage("Bad Format")
blur
的实现现在变成了一个普通函数,调用 pil
,并且可以安全地传递给exec_async
:
def blur(image):
def transform(frame):
frame = frame.convert("RGBA")
return frame.filter(ImageFilter.BLUR)
return pil(image, transform)
@router.get("/blur/", responses=normal_response)
async def blur_image(url: str):
byt = await Client.image_bytes(url)
img, image_format = await exec_async(blur, byt)
return Response(img.read(), media_type=f"image/image_format")
注意:以上代码未经测试。
【讨论】:
以上是关于在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败的主要内容,如果未能解决你的问题,请参考以下文章
在装饰器中使用 ProcessPoolExecutor 酸洗 Python 函数失败