Python多处理,从没有缓冲区的远程RTSP相机读取

Posted

技术标签:

【中文标题】Python多处理,从没有缓冲区的远程RTSP相机读取【英文标题】:Python multiprocessing, read from remote RTSP camera without buffer 【发布时间】:2021-12-08 04:51:03 【问题描述】:

我的系统有两个进程,其中:

    “读取器”进程,通过 RTSP 从远程摄像机获取帧; 从“reader”读取的帧被发送到“consumer”,以在其上运行一些计算机视觉算法。

现在,问题是在“阅读器”中以 25 FPS 从相机读取帧,但在“消费者”中分析它们的速度显然要慢得多。然后,我不想让“消费者”分析所有这些,而只分析最新可用的(因此计算机视觉检测是指实时流)。

类似于此处描述的内容:

我设法通过一种变通方法使这项工作按照我想要的方式进行。 基本上,在阅读器中,我检查队列是否为空。如果不是,则表示那里的帧尚未分析,因此我将其删除并替换为当前使用的帧:

launcher.py -> 启动一切

from reader import Reader
from consumer import Consumer
import multiprocessing as mp
from multiprocessing import set_start_method, Queue, Event


def main():

    set_start_method("spawn")
    frames_queue = mp.Queue()
    stop_switch = mp.Event()

    reader = mp.Process(target=Reader, args=(frames_list,), daemon=True)
    consumer = mp.Process(target=Consumer, args=(frames_list, stop_switch), daemon=True)

    reader.start()
    consumer.start()

    while True:
        if stop_switch.is_set():
            reader.terminate()
            consumer.terminate()
            sys.exit(0)


if __name__ == "__main__":
    main()

reader.py -> 从相机读取帧

import cv2

def Reader(thing):
    cap = cv2.VideoCapture('rtsp_address')

    while True:
        ret, frame = cap.read()
        if ret:
            if not frames_queue.empty():
                try:
                    frames_queue.get_nowait()   # discard previous (unprocessed) frame
                except queue.Empty:
                    pass

                try:
                    frames_queue.put(cv2.resize(frame, (1080, 720)), block=False)
                except:
                    pass

在消费者中也有类似的东西:

consumer.py

import cv2

def Consumer(frames_queue, stop_switch):

    while True:

        try:
            frame = frames_queue.get_nowait()      ## get current camera frame from queue
        except:
            pass

        if frame:
            ## do something computationally intensive on frame
            cv2.imshow('output', cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        ## stop system when pressing 'q' key
        key = cv2.waitKey(1)
        if key==ord('q'):
            stop_switch.set()
            break

但我不是很喜欢这个,它似乎有点太乱了。另外,我必须使用所有的 try/except 块来避免竞争条件,其中“阅读器”在放置新帧之前清空队列,而“消费者”尝试同时获取帧。 还有其他更好的方法吗?

【问题讨论】:

【参考方案1】:

我不久前针对多处理视频问题创建了一个solution,我认为它非常适用于这个确切的场景:

使用multiprocessing.shared_memory 支持的数组作为缓冲区,并从正在读取帧的进程连续将视频帧写入其中。然后在另一个进程中,您可以只复制当前帧并对其进行一段时间的处理。在处理过程中写入的任何帧都会被下一帧覆盖,直到您准备好下一帧。

注意视频源的帧率和视频显示的帧率不同(分别受sleep(1/30)cv2.waitKey(1000)限制)

from multiprocessing import Process, Lock
from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np


def process_frames(shm, frame_shape, frame_dtype, frame_lock, exit_flag):
    #create numpy array from buffer
    frame_buffer = np.ndarray(frame_shape, buffer=shm.buf, dtype=frame_dtype)
    try:
        while True:
            if exit_flag.acquire(False): #try (without waiting) to get the lock, and if successful: exit
                break
            with frame_lock:
                frame = frame_buffer.copy() #don't want the data being updated while processing happens
            cv2.imshow('frame', frame)
            cv2.waitKey(1000) #this is needed for cv2 to update the gui, and waiting a long time simulates heavy processing
    except KeyboardInterrupt: #some systems propogate signals to child processes making exit_flag unnecessary. Others don't
        pass
    shm.close()
    print("child exiting")



def read_and_process_frames():
    vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
    
    #get the first frame to calculate size
    cap = cv2.VideoCapture(vid_device)
    success, frame = cap.read()
    if not success:
        raise Exception("error reading from video")
    
    #create the shared memory for the frame buffer
    frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
    frame_buffer = np.ndarray(frame.shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)
    
    frame_lock = Lock()
    exit_flag = Lock()
    exit_flag.acquire() #start in a locked state. When the reader process successfully acquires; that's the exit signal
    
    processing_process = Process(target=process_frames, args=(frame_buffer_shm, 
                                                              frame.shape, 
                                                              frame.dtype, 
                                                              frame_lock, 
                                                              exit_flag))
    processing_process.start()
    
    try: #use keyboardinterrupt to quit
        while True:
            with frame_lock:
                cap.read(frame_buffer) #read data into frame buffer
            sleep(1/30) #limit framerate-ish for video file (hitting actual framerate is more complicated than 1 line)
    except KeyboardInterrupt:
        print("exiting")
    
    exit_flag.release() #signal the child process to exit
    processing_process.join() #wait for child to exit
    
    #cleanup
    cap.release()
    frame_buffer_shm.close()
    frame_buffer_shm.unlink()


if __name__ == "__main__":
    read_and_process_frames()

【讨论】:

【参考方案2】:

以下是我能想到的一些想法……

...我不是所谓的 MP 专家,但我在生产代码中大量使用 MP 和 MT 进行并行类型的图像处理。

    考虑您所说的后的第一个问题 - 为什么要使用多处理开始?

让我解释一下:您的***别问题可以表述为

“在最近的帧上做一些计算密集型的事情” 警告:这个“东西”所花费的时间远大于生成框架的时间。

在您当前的代码中,您只是丢弃了生产者中除最新帧之外的任何内容,并且正如您所说,生成帧比处理它们要快得多。

那么,考虑到这一点,为什么还要使用 MP?是不是 a) 简单得多 b) 做起来快得多(元代码):


while I_should_be_running:

    frame = get_latest_frame()
    processed = do_work_on_frame()
    save_if_needed(processed)

在这种情况下,您可能应该使用 多线程 来实现在此主循环之外设置/取消设置 I_should_be_running 的方法,但关键是您正在做的工作是全部在一个进程中。

老实说,就 KISS 原则而言,并且个人在尝试实施 MP 时(至少对我而言)总是存在一些皱纹和复杂性,根据您提出的问题,这可能很多更明智的做法是在一个进程中像上面一样运行它...

    如果您使用 MP,并且只想处理最近的帧,为什么要使用队列?

生产者中的代码是

清空当前队列 获取新框架 添加队列

那么,为什么要使用队列呢?相反,也许只使用一个实例变量,例如,生产者写入的most_recent_frame

您需要在进程之间围绕此设置锁定,以便消费者可以锁定它,以便它可以将其复制到随后进行处理。但我相信 MP 库中有各种工具可以做到这一点。

这将避免做不必要的工作(即处理时间)的大量开销:创建未使用的对象实例,在队列中腌制这些对象(请参阅“管道和队列”周围的 https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue 部分 - 这很慢! ) 并在那些未使用的对象被删除后进行垃圾回收。

此外,您使用的队列(理论上)有一个生产者,该生产者不断添加东西,但没有限制。虽然您的代码没有风险(因为您要删除生产者中的所有先前条目),但如果出现问题,这种方法存在潜在的巨大内存溢出风险。

    为什么要使用基于Pipe 的进程b/w 连接对象(即队列)?

当您在 MP 中使用队列时,它基于Pipe,它将创建您正在使用它的对象的腌制副本。对于大型和/或复杂的对象,这可能会非常慢。

如果您担心最大速度,则可以使用ctypes 共享内存对象(再次参见https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue)。

这确实需要锁定(MP 代码提供),但比大型/复杂对象的基于队列的对象pickle/复制要快得多 - 您的frame 可能是这样。

考虑到您尝试以每秒 10 多次的速度生成帧,这种泡菜/复制开销可能会很大。

【讨论】:

【参考方案3】:

几个cmets:

    根据multiprocessing.Queue.empty方法的文档,这是不可靠的,不应该使用。

    正如我之前在您的上一篇文章中所评论的那样,当您应该愿意阻止直到有可用框架时,像在 consumer.py 中所做的那样循环 get_nowait() 调用只是浪费CPU 周期。

    同样地,在您可以发出 stop_switch.wait() 的情况下,在您的主进程测试 stop_switch.is_set() 中循环只会浪费 CPU 周期。

    在您的主进程中,当您检测到stop_switch 已设置时,无需显式终止您创建的守护进程;当主进程终止时,它们将自动终止。

否则,清理上面提到的项目应该会产生我认为不会那么混乱的代码。

ma​​in.py

from reader import Reader
from consumer import Consumer
import multiprocessing as mp
from multiprocessing import set_start_method, Queue, Event

def main():

    set_start_method("spawn")
    frames_queue = mp.Queue()
    stop_switch = mp.Event()

    reader = mp.Process(target=Reader, args=(frames_list,), daemon=True)
    consumer = mp.Process(target=Consumer, args=(frames_list, stop_switch), daemon=True)

    reader.start()
    consumer.start()

    stop_switch.wait()


if __name__ == "__main__":
    main()

reader.py

import cv2

def Reader(thing):
    cap = cv2.VideoCapture('rtsp_address')

    while True:
        ret, frame = cap.read()
        if ret:
            try:
                # discard possible previous (unprocessed) frame
                frames_queue.get_nowait()
            except queue.Empty:
                pass

            try:
                frames_queue.put(cv2.resize(frame, (1080, 720)), block=False)
            except:
                pass

consumer.py

import cv2

def Consumer(frames_queue, stop_switch):

    while True:
        frame = frames_queue.get()     ## get current camera frame from queue
        ## do something computationally intensive on frame
        cv2.imshow('output', cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        ## stop system when pressing 'q' key
        key = cv2.waitKey(1)
        if key==ord('q'):
            stop_switch.set()
            break

【讨论】:

非常感谢,在您在上一篇文章中关于 get_nowait() 部分循环的建议之后,我忘记更新代码了。至于其余的,老实说,我不知道 .wait() 方法的存在,并且我记错了 Daemon 标志,我认为它正好相反(对于这两件事,仍然是新手多处理 :) )。无论如何,你是对的,这些修复确实让一切看起来更好(除了节省 CPU 周期)

以上是关于Python多处理,从没有缓冲区的远程RTSP相机读取的主要内容,如果未能解决你的问题,请参考以下文章

未考虑 Java OpenCV VideoCapture 属性。需要避免最新帧的“当真”。多相机环境

无实时约束处理大量 rtsp 摄像头

监控摄像机的视频是通过啥协议传输的?

在 Python 中从 RTSP 流中读取帧

海康威视相机 RTSP 传输延迟解决方案

海康威视相机 RTSP 传输延迟解决方案