如何在异步应用程序中执行同步 cv2?
Posted
技术标签:
【中文标题】如何在异步应用程序中执行同步 cv2?【英文标题】:How to execute synchronous cv2 in async app? 【发布时间】:2021-05-01 12:56:39 【问题描述】:我在这里提出愚蠢的问题,但谷歌和聊天室说他们不知道如何解决这个问题。
在异步应用中,我需要通过功能调用从网络摄像头开始录制视频。在录制过程中,我需要执行一些代码,完成后 - 通过另一个调用停止录制。
我的问题在于我使用 cv2 录制视频,而且它是同步的。 即记录会阻止整个事件循环。这不是我需要的工作方式)
有人知道如何处理这个问题,或者有解决类似问题的经验吗?
我试图创建一个类来将标志从阻塞功能中取出
并使用ThreadPoolExecutor
调用我的录音功能。
准确地说:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, self.recording)
在这种情况下,如果record
是一个异步函数,我会得到‘courutine was never awaited’异常,这很好,因为threadpoolexecutor
不应该与异步函数一起使用。但。如果record
是默认同步函数,它会阻塞事件循环,就像我直接调用它一样
这是完整的代码:
from cv2 import cv2 as cv
import requests
import time
import asyncio
import concurrent.futures
from datetime import datetime
# from tmp.room import room
cam = cv.VideoCapture(0) # 0 -> index of camera
width = int(cam.get(cv.CAP_PROP_FRAME_WIDTH) + 0.5)
height = int(cam.get(cv.CAP_PROP_FRAME_HEIGHT) + 0.5)
size = (width, height)
# Define the codec and create VideoWriter object
fourcc = cv.VideoWriter_fourcc(*'XVID')
# out = cv.VideoWriter(f'room.avi', fourcc, 10.0, size, 1)
out = cv.VideoWriter(f'output.avi', fourcc, 10.0, size, 1)
class CameraControls(object):
def __init__(self):
self.flag = True
def recording(self):
while True:
ret, frame = cam.read()
out.write(frame)
if not self.flag:
break
cam.release()
out.release()
cv.destroyAllWindows()
async def do_test(self):
self.flag = True
_loop = asyncio.get_event_loop()
result = await _loop.run_in_executor(None, self.recording)
await asyncio.sleep(5)
self.flag = False
async def read_and_send_vid(barcode, auth_token):
name = f"barcode-str(datetime.today().timestamp()).split('.')[0]"
url = f"url/name.avi"
head =
"X-Auth-Token": auth_token,
"Content-Type": "video/x-msvideo",
async with open('output.avi', mode='rb') as file:
response = requests.post(url=url, head=head, files=file)
if response.status_code == 201:
return name
cl1 = CameraControls()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(cl1.do_test())
【问题讨论】:
【参考方案1】:是的,执行器就是答案 - 只是 asyncio
已经有适当的包装器来启动池并运行您的函数:
...
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, sync_function, *args)
我相信这应该可行。不幸的是,这是我的实际方式 试图运行我的代码(你可以在我的 do_test() 函数中找到它 完整代码sn-p。我是不是做错了什么?
是的。异步代码的特点是您可以编写并行代码,其行为就好像它是串行的,从某种意义上说,您可以完全控制代码可能被并行任务中断的位置。这在避免并发编程的大多数陷阱方面具有很大的优势:对锁、信号量等机制的需求非常少。
不利的一面是,您有责任真正确保您希望并行的所有内容都是并行的:每当您使用“等待”时,当等待的任务完成时,您的代码只会在下一行继续。同时,其他计划任务可能会运行。在这种情况下,这意味着在调用run_in_executor
之后执行await sleep
的行只有在录制结束时才会到达。
在这种情况下,您想要的是安排 run_in_executor
与
循环调用“create_task”,而不是等待它:
async def do_test(self):
self.flag = True
_loop = asyncio.get_event_loop()
_loop.create_task(_loop.run_in_executor(None, self.recording))
# Now the task created in the call above starts running in the
# moment the current co-routine is paused in the call bellow:
await asyncio.sleep(5)
self.flag = False
其他强制代码并行运行的方法包括使用asyncio.gather
来实际启动多个协同程序并行 - 假设您需要read_and_send_vid
代码aboce 来沿着另一个函数中的记录运行:
loop.run_until_complete(
asyncio.gather(
read_and_send_vid(...),
CameraControls().do_test()
))
【讨论】:
我确定这应该可以。不幸的是,这是我尝试运行我的代码的实际方式(你可以在我的完整代码 sn-p 中的do_test()
函数中找到它。我做错了吗?以上是关于如何在异步应用程序中执行同步 cv2?的主要内容,如果未能解决你的问题,请参考以下文章