Python多处理,从没有缓冲区的远程RTSP相机读取
Posted
技术标签:
【中文标题】Python多处理,从没有缓冲区的远程RTSP相机读取【英文标题】:Python multiprocessing, read from remote RTSP camera without buffer 【发布时间】:2021-12-08 04:51:03 【问题描述】:我的系统有两个进程,其中:
-
“读取器”进程,通过 RTSP 从远程摄像机获取帧;
从“reader”读取的帧被发送到“consumer”,以在其上运行一些计算机视觉算法。
现在,问题是在“阅读器”中以 25 FPS 从相机读取帧,但在“消费者”中分析它们的速度显然要慢得多。然后,我不想让“消费者”分析所有这些,而只分析最新可用的(因此计算机视觉检测是指实时流)。
类似于此处描述的内容:
我设法通过一种变通方法使这项工作按照我想要的方式进行。 基本上,在阅读器中,我检查队列是否为空。如果不是,则表示那里的帧尚未分析,因此我将其删除并替换为当前使用的帧:
launcher.py -> 启动一切
from reader import Reader
from consumer import Consumer
import multiprocessing as mp
from multiprocessing import set_start_method, Queue, Event
def main():
set_start_method("spawn")
frames_queue = mp.Queue()
stop_switch = mp.Event()
reader = mp.Process(target=Reader, args=(frames_list,), daemon=True)
consumer = mp.Process(target=Consumer, args=(frames_list, stop_switch), daemon=True)
reader.start()
consumer.start()
while True:
if stop_switch.is_set():
reader.terminate()
consumer.terminate()
sys.exit(0)
if __name__ == "__main__":
main()
reader.py -> 从相机读取帧
import cv2
def Reader(thing):
cap = cv2.VideoCapture('rtsp_address')
while True:
ret, frame = cap.read()
if ret:
if not frames_queue.empty():
try:
frames_queue.get_nowait() # discard previous (unprocessed) frame
except queue.Empty:
pass
try:
frames_queue.put(cv2.resize(frame, (1080, 720)), block=False)
except:
pass
在消费者中也有类似的东西:
consumer.py
import cv2
def Consumer(frames_queue, stop_switch):
while True:
try:
frame = frames_queue.get_nowait() ## get current camera frame from queue
except:
pass
if frame:
## do something computationally intensive on frame
cv2.imshow('output', cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
## stop system when pressing 'q' key
key = cv2.waitKey(1)
if key==ord('q'):
stop_switch.set()
break
但我不是很喜欢这个,它似乎有点太乱了。另外,我必须使用所有的 try/except 块来避免竞争条件,其中“阅读器”在放置新帧之前清空队列,而“消费者”尝试同时获取帧。 还有其他更好的方法吗?
【问题讨论】:
【参考方案1】:我不久前针对多处理视频问题创建了一个solution,我认为它非常适用于这个确切的场景:
使用multiprocessing.shared_memory
支持的数组作为缓冲区,并从正在读取帧的进程连续将视频帧写入其中。然后在另一个进程中,您可以只复制当前帧并对其进行一段时间的处理。在处理过程中写入的任何帧都会被下一帧覆盖,直到您准备好下一帧。
注意视频源的帧率和视频显示的帧率不同(分别受sleep(1/30)
和cv2.waitKey(1000)
限制)
from multiprocessing import Process, Lock
from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np
def process_frames(shm, frame_shape, frame_dtype, frame_lock, exit_flag):
#create numpy array from buffer
frame_buffer = np.ndarray(frame_shape, buffer=shm.buf, dtype=frame_dtype)
try:
while True:
if exit_flag.acquire(False): #try (without waiting) to get the lock, and if successful: exit
break
with frame_lock:
frame = frame_buffer.copy() #don't want the data being updated while processing happens
cv2.imshow('frame', frame)
cv2.waitKey(1000) #this is needed for cv2 to update the gui, and waiting a long time simulates heavy processing
except KeyboardInterrupt: #some systems propogate signals to child processes making exit_flag unnecessary. Others don't
pass
shm.close()
print("child exiting")
def read_and_process_frames():
vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success, frame = cap.read()
if not success:
raise Exception("error reading from video")
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
frame_buffer = np.ndarray(frame.shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)
frame_lock = Lock()
exit_flag = Lock()
exit_flag.acquire() #start in a locked state. When the reader process successfully acquires; that's the exit signal
processing_process = Process(target=process_frames, args=(frame_buffer_shm,
frame.shape,
frame.dtype,
frame_lock,
exit_flag))
processing_process.start()
try: #use keyboardinterrupt to quit
while True:
with frame_lock:
cap.read(frame_buffer) #read data into frame buffer
sleep(1/30) #limit framerate-ish for video file (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
print("exiting")
exit_flag.release() #signal the child process to exit
processing_process.join() #wait for child to exit
#cleanup
cap.release()
frame_buffer_shm.close()
frame_buffer_shm.unlink()
if __name__ == "__main__":
read_and_process_frames()
【讨论】:
【参考方案2】:以下是我能想到的一些想法……
...我不是所谓的 MP 专家,但我在生产代码中大量使用 MP 和 MT 进行并行类型的图像处理。
-
考虑您所说的后的第一个问题 - 为什么要使用多处理开始?
让我解释一下:您的***别问题可以表述为
“在最近的帧上做一些计算密集型的事情” 警告:这个“东西”所花费的时间远大于生成框架的时间。在您当前的代码中,您只是丢弃了生产者中除最新帧之外的任何内容,并且正如您所说,生成帧比处理它们要快得多。
那么,考虑到这一点,为什么还要使用 MP?是不是 a) 简单得多 b) 做起来快得多(元代码):
while I_should_be_running:
frame = get_latest_frame()
processed = do_work_on_frame()
save_if_needed(processed)
在这种情况下,您可能应该使用 多线程 来实现在此主循环之外设置/取消设置 I_should_be_running
的方法,但关键是您正在做的工作是全部在一个进程中。
老实说,就 KISS 原则而言,并且个人在尝试实施 MP 时(至少对我而言)总是存在一些皱纹和复杂性,根据您提出的问题,这可能很多更明智的做法是在一个进程中像上面一样运行它...
-
如果您使用 MP,并且只想处理最近的帧,为什么要使用队列?
生产者中的代码是
清空当前队列 获取新框架 添加队列那么,为什么要使用队列呢?相反,也许只使用一个实例变量,例如,生产者写入的most_recent_frame
。
您需要在进程之间围绕此设置锁定,以便消费者可以锁定它,以便它可以将其复制到随后进行处理。但我相信 MP 库中有各种工具可以做到这一点。
这将避免做不必要的工作(即处理时间)的大量开销:创建未使用的对象实例,在队列中腌制这些对象(请参阅“管道和队列”周围的 https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue 部分 - 这很慢! ) 并在那些未使用的对象被删除后进行垃圾回收。
此外,您使用的队列(理论上)有一个生产者,该生产者不断添加东西,但没有限制。虽然您的代码没有风险(因为您要删除生产者中的所有先前条目),但如果出现问题,这种方法存在潜在的巨大内存溢出风险。
-
为什么要使用基于
Pipe
的进程b/w 连接对象(即队列)?
当您在 MP 中使用队列时,它基于Pipe
,它将创建您正在使用它的对象的腌制副本。对于大型和/或复杂的对象,这可能会非常慢。
如果您担心最大速度,则可以使用ctypes
共享内存对象(再次参见https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue)。
这确实需要锁定(MP 代码提供),但比大型/复杂对象的基于队列的对象pickle/复制要快得多 - 您的frame
可能是这样。
考虑到您尝试以每秒 10 多次的速度生成帧,这种泡菜/复制开销可能会很大。
【讨论】:
【参考方案3】:几个cmets:
根据multiprocessing.Queue.empty
方法的文档,这是不可靠的,不应该使用。
正如我之前在您的上一篇文章中所评论的那样,当您应该愿意阻止直到有可用框架时,像在 consumer.py 中所做的那样循环 get_nowait()
调用只是浪费CPU 周期。
同样地,在您可以发出 stop_switch.wait()
的情况下,在您的主进程测试 stop_switch.is_set()
中循环只会浪费 CPU 周期。
在您的主进程中,当您检测到stop_switch
已设置时,无需显式终止您创建的守护进程;当主进程终止时,它们将自动终止。
否则,清理上面提到的项目应该会产生我认为不会那么混乱的代码。
main.py
from reader import Reader
from consumer import Consumer
import multiprocessing as mp
from multiprocessing import set_start_method, Queue, Event
def main():
set_start_method("spawn")
frames_queue = mp.Queue()
stop_switch = mp.Event()
reader = mp.Process(target=Reader, args=(frames_list,), daemon=True)
consumer = mp.Process(target=Consumer, args=(frames_list, stop_switch), daemon=True)
reader.start()
consumer.start()
stop_switch.wait()
if __name__ == "__main__":
main()
reader.py
import cv2
def Reader(thing):
cap = cv2.VideoCapture('rtsp_address')
while True:
ret, frame = cap.read()
if ret:
try:
# discard possible previous (unprocessed) frame
frames_queue.get_nowait()
except queue.Empty:
pass
try:
frames_queue.put(cv2.resize(frame, (1080, 720)), block=False)
except:
pass
consumer.py
import cv2
def Consumer(frames_queue, stop_switch):
while True:
frame = frames_queue.get() ## get current camera frame from queue
## do something computationally intensive on frame
cv2.imshow('output', cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
## stop system when pressing 'q' key
key = cv2.waitKey(1)
if key==ord('q'):
stop_switch.set()
break
【讨论】:
非常感谢,在您在上一篇文章中关于 get_nowait() 部分循环的建议之后,我忘记更新代码了。至于其余的,老实说,我不知道 .wait() 方法的存在,并且我记错了 Daemon 标志,我认为它正好相反(对于这两件事,仍然是新手多处理 :) )。无论如何,你是对的,这些修复确实让一切看起来更好(除了节省 CPU 周期)以上是关于Python多处理,从没有缓冲区的远程RTSP相机读取的主要内容,如果未能解决你的问题,请参考以下文章