如何在异步应用程序中执行同步 cv2?

Posted

技术标签:

【中文标题】如何在异步应用程序中执行同步 cv2?【英文标题】:How to execute synchronous cv2 in async app? 【发布时间】:2021-05-01 12:56:39 【问题描述】:

我在这里提出愚蠢的问题,但谷歌和聊天室说他们不知道如何解决这个问题。

在异步应用中,我需要通过功能调用从网络摄像头开始录制视频。在录制过程中,我需要执行一些代码,完成后 - 通过另一个调用停止录制。

我的问题在于我使用 cv2 录制视频,而且它是同步的。 即记录会阻止整个事件循环。这不是我需要的工作方式)

有人知道如何处理这个问题,或者有解决类似问题的经验吗? 我试图创建一个类来将标志从阻塞功能中取出 并使用ThreadPoolExecutor 调用我的录音功能。 准确地说:

with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, self.recording)

在这种情况下,如果record 是一个异步函数,我会得到‘courutine was never awaited’异常,这很好,因为threadpoolexecutor 不应该与异步函数一起使用。但。如果record 是默认同步函数,它会阻塞事件循环,就像我直接调用它一样

这是完整的代码:

from cv2 import cv2 as cv
import requests
import time
import asyncio
import concurrent.futures
from datetime import datetime
# from tmp.room import room

cam = cv.VideoCapture(0)  # 0 -> index of camera
width = int(cam.get(cv.CAP_PROP_FRAME_WIDTH) + 0.5)
height = int(cam.get(cv.CAP_PROP_FRAME_HEIGHT) + 0.5)
size = (width, height)
# Define the codec and create VideoWriter object
fourcc = cv.VideoWriter_fourcc(*'XVID')
# out = cv.VideoWriter(f'room.avi', fourcc, 10.0, size, 1)
out = cv.VideoWriter(f'output.avi', fourcc, 10.0, size, 1)


class CameraControls(object):
    def __init__(self):
        self.flag = True

    def recording(self):
        while True:
            ret, frame = cam.read()
            out.write(frame)

            if not self.flag:
                break
        cam.release()
        out.release()
        cv.destroyAllWindows()

    async def do_test(self):
        self.flag = True
        _loop = asyncio.get_event_loop()
        result = await _loop.run_in_executor(None, self.recording)
        await asyncio.sleep(5)
        self.flag = False


async def read_and_send_vid(barcode, auth_token):
    name = f"barcode-str(datetime.today().timestamp()).split('.')[0]"
    url = f"url/name.avi"
    head = 
        "X-Auth-Token": auth_token,
        "Content-Type": "video/x-msvideo",
    
    async with open('output.avi', mode='rb') as file:
        response = requests.post(url=url, head=head, files=file)
        if response.status_code == 201:
            return name

cl1 = CameraControls()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(cl1.do_test())

【问题讨论】:

【参考方案1】:

是的,执行器就是答案 - 只是 asyncio 已经有适当的包装器来启动池并运行您的函数:

...
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, sync_function, *args)

我相信这应该可行。不幸的是,这是我的实际方式 试图运行我的代码(你可以在我的 do_test() 函数中找到它 完整代码sn-p。我是不是做错了什么?

是的。异步代码的特点是您可以编写并行代码,其行为就好像它是串行的,从某种意义上说,您可以完全控制代码可能被并行任务中断的位置。这在避免并发编程的大多数陷阱方面具有很大的优势:对锁、信号量等机制的需求非常少。

不利的一面是,您有责任真正确保您希望并行的所有内容都是并行的:每当您使用“等待”时,当等待的任务完成时,您的代码只会在下一行继续。同时,其他计划任务可能会运行。在这种情况下,这意味着在调用run_in_executor 之后执行await sleep 的行只有在录制结束时才会到达。

在这种情况下,您想要的是安排 run_in_executor 与 循环调用“create_task”,而不是等待它:

    async def do_test(self):
        self.flag = True
        _loop = asyncio.get_event_loop()
        _loop.create_task(_loop.run_in_executor(None, self.recording))
        # Now the task created in the call above starts running in the
        # moment the current co-routine is paused in the call bellow:
        await asyncio.sleep(5)
        self.flag = False

其他强制代码并行运行的方法包括使用asyncio.gather 来实际启动多个协同程序并行 - 假设您需要read_and_send_vid 代码aboce 来沿着另一个函数中的记录运行:


loop.run_until_complete(
    asyncio.gather(
        read_and_send_vid(...), 
        CameraControls().do_test()
))

【讨论】:

我确定这应该可以。不幸的是,这是我尝试运行我的代码的实际方式(你可以在我的完整代码 sn-p 中的do_test() 函数中找到它。我做错了吗?

以上是关于如何在异步应用程序中执行同步 cv2?的主要内容,如果未能解决你的问题,请参考以下文章

代码如何在同步的单核CPU上异步?

立即从同步代码执行异步回调

为啥异步控制器方法像同步方法一样执行?

Java中的线程同步与异步如何理解?

浅析异步执行

同步程序与异步程序执行原理