如何在 OSX 上的单独进程中读取网络摄像头?
Posted
技术标签:
【中文标题】如何在 OSX 上的单独进程中读取网络摄像头?【英文标题】:How to read webcam in separate process on OSX? 【发布时间】:2017-09-20 05:30:02 【问题描述】:我正在读取 OSX 上的网络摄像头,它适用于这个简单的脚本:
import cv2
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms before going to the next frame
except KeyboardInterrupt:
# cleanup the camera and close any open windows
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
我现在想在一个单独的进程中阅读视频,我有一个更长的脚本,并且在 Linux 上的一个单独的进程中正确地读出了视频:
import numpy as np
import time
import ctypes
import argparse
from multiprocessing import Array, Value, Process
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(floating point is allowed)
"""
self._cap = cv2.VideoCapture(device)
self._delay = delay
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there's no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def get_stream_function(self):
"""
Returns stream_function object function
"""
def stream_function(image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing(see multiprocessing.Array)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: nothing
"""
# Incorrect input array
if image.shape != self.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
image[:, :, :] = self._proper_frame(self._delay)
# Release the device
self.release()
return stream_function
def release(self):
self._cap.release()
def main():
# Add program arguments
parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-output', dest="output", default="output.avi", help='name of the output video file')
parser.add_argument('-log', dest="log", default="frames.log", help='name of the log file')
parser.add_argument('-fps', dest="fps", default=25., help='frames per second value')
# Read the arguments if any
result = parser.parse_args()
fps = float(result.fps)
output = result.output
log = result.log
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
# Define shared variables(which are synchronised so race condition is excluded)
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.terminate()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
cv2.waitKey(1) # Display it at least one ms before going to the next frame
time.sleep(0.1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
main()
这在 Linux 上运行良好,但在 OSX 上我遇到了麻烦,因为它似乎无法对创建的 cv2.VideoCapture(device)
对象(存储在 var self._cap
中)执行 .read()
。
经过一番搜索,我找到了this SO answer,它建议使用Billiard,它是python 多处理的替代品,据说有一些非常有用的改进。所以在文件的顶部,我只是在之前的多处理导入之后添加了导入(有效地覆盖了multiprocessing.Process
):
from billiard import Process, forking_enable
在video_process
变量的实例化之前,我使用forking_enable
,如下所示:
forking_enable(0) # Supposedly this is all I need for billiard to do it's magic
video_process = Process(target=stream, args=(frame, finished))
所以在这个版本 (here on pastebin) 中,我再次运行该文件,这给了我这个错误:
pickle.PicklingError: Can't pickle : it's not found as main.stream_function
搜索该错误导致我找到an SO question with a long list of answers,其中一个给了我使用dill serialization lib 解决此问题的建议。然而,该库应该与Pathos multiprocessing fork 一起使用。所以我只是尝试从
更改我的多处理导入行from multiprocessing import Array, Value, Process
到
from pathos.multiprocessing import Array, Value, Process
但Array
、Value
和 Process
似乎都不存在于 pathos.multiprocessing
包中。
从这一点开始,我完全迷失了。我正在搜索我几乎没有足够知识的东西,我什至不知道我需要在哪个方向进行搜索或调试。
那么有比我更聪明的人可以帮助我在单独的过程中捕获视频吗?欢迎所有提示!
【问题讨论】:
试试mp4v
作为你的fourcc。
@MarkSetchell - 但是在多处理代码中,我什至没有尝试编写视频,因为我什至无法从网络摄像头中读出它。问题是阅读,而不是写作。我还将从初始脚本中删除文字,以免混淆人们。您知道在多处理代码中读取网络摄像头有什么问题吗?
我是pathos
和dill
的作者。您可能想尝试multiprocess
,它是pathos
下的库,但界面与multiprocessing
完全相同。在那里您会找到Array
、Value
和Process
对象。
【参考方案1】:
您的第一个问题是您无法在forked
进程中访问网络摄像头。当外部库与fork
一起使用时会出现几个问题,因为 fork 操作不会清除父进程打开的所有文件描述符,从而导致奇怪的行为。该库通常对 linux 上的此类问题更健壮,但在 2 个进程之间共享 IO 对象(例如 cv2.VideoCapture
)并不是一个好主意。
当您使用billard.forking_enabled
并将其设置为False
时,您要求库不要使用fork
来生成新进程,而是使用spawn
或forkserver
方法,这些方法在关闭所有文件时更简洁描述符,但启动速度也较慢,这在您的情况下应该不是问题。如果您使用的是python3.4+
,则可以使用multiprocessing.set_start_method('forkserver')
来执行此操作。
当您使用其中一种方法时,目标函数和参数需要被序列化以传递给子进程。默认情况下,序列化是使用pickle
完成的,它有几个流程,正如您提到的那样,无法序列化本地定义的对象和cv2.VideoCapture
。但是你可以简化你的程序,让你的Process
picklelisable 的所有论点。这是解决您问题的一个尝试:
import numpy as np
import time
import ctypes
from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(float allowed)
"""
self._delay = delay
self._device = device
self._cap = cv2.VideoCapture(device)
assert self._cap.isOpened()
def __getstate__(self):
self._cap.release()
return (self._delay, self._device)
def __setstate__(self, state):
self._delay, self._device = state
self._cap = cv2.VideoCapture(self._device)
assert self._cap.grab(), "The child could not grab the video capture"
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there's no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture "
"the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def release(self):
self._cap.release()
def stream(capturer, image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing
:param finished: synchronized wrapper for int
:return: nothing
"""
shape = capturer.get_size()
# Define shared variables
frame = np.ctypeslib.as_array(image.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
# Incorrect input array
if frame.shape != capturer.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
frame[:, :, :] = capturer._proper_frame(capturer._delay)
# Release the device
capturer.release()
def main():
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
# Define shared variables
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream,
args=(cap, shared_array_base, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.join()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
# Display it at least one ms before going to the next frame
time.sleep(0.1)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
set_start_method("spawn")
main()
我目前无法在 mac 上对其进行测试,因此它可能无法开箱即用,但不应出现与 multiprocessing
相关的错误。一些注意事项:
cv2.VideoCapture
对象并抓住相机,因为只有一个进程应该从相机中读取数据。
也许您的第一个带有fork
的程序中的问题只是由于共享cv2.VideoCapture
而在stream
函数中重新创建它可以解决您的问题。
您不能将 numpy 包装器传递给孩子,因为它不会共享 mp.Array
缓冲区(这真的很奇怪,我花了一段时间才弄清楚)。您需要明确传递 Array
并重新创建一个包装器。
也许您的第一个带有fork
的程序中的问题只是由于共享cv2.VideoCapture
而在stream
函数中重新创建它可以解决您的问题。
我假设您在 python3.4+
中运行您的代码,所以我没有使用 billard
,但使用 forking_enabled(False)
而不是 set_start_method
应该有点相似。
让我知道这是否可行!
【讨论】:
您好 Thomas,感谢您的回答。我实际上使用的是 Python 2.7,因此我尝试使用forking_enable(0)
调整您的代码,但随后出现错误消息 RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
。知道如何解决吗?
这真的很奇怪,因为这段代码通过继承共享SynchronizedArray
(当您使用Process
args 传递它时)。您可以尝试使用来自 billard 的 set_start_method
,使用 spawn
。如果这不起作用,你可以试试这个库loky
which backport spawn
for python2.7(免责声明,我是这个库的维护者)
经过一番思考:这很正常。由于forking_enable
,有一些奇怪的行为。您应该使用set_start_method
(存在于billard
)获得更好的结果,因为它将使用与您的Process
兼容的Array
的实现。这是由于上下文管理。当您对 Array
使用 vanilla 导入时,您无法控制正在使用的实现。你也可以创建一个上下文ctx = get_context('spawn')
,然后使用ctx.Array
、ctx.Value
和ctx.Process
来确保你有兼容的对象。
我想我尝试了你所有的建议。我最终得到了这个:pastebin.com/1tvRBqSF,但我现在收到一条错误消息:AttributeError: 'module' object has no attribute 'stream'
,然后它以段错误结束。我在粘贴中包含了完整的错误。 (哦,Loky 看起来是一个非常令人印象深刻的库,虽然对于我的 Python 知识来说使用它有点太多了)
这看起来像是billiard
中的pickler 实现中的一些错误。在我看来,最好切换到 python3(使用multiprocessing
时有很多好处)并使用默认库。此外,另一个答案是使用生产者/消费者逻辑和管道进行通信。这简化了逻辑,避免共享不必要的资源。我仍然认为内存要求更高,但它确实不应该成为您的应用程序中的问题。【参考方案2】:
multiprocessing
的主要挑战是理解内存地址空间分离时的内存模型。
Python 使事情变得更加混乱,因为它抽象了许多这些方面,在几个看似无辜的 API 下隐藏了几种机制。
当你写这个逻辑时:
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
...
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
您正在传递给 Process
stream_function
,它指的是 VideoCapture
类 (self.get_size
) 的内部组件,但更重要的是,它不能用作 top level function。
子进程将无法重新构造所需的对象,因为它接收到的只是一个函数的名称。它尝试在主模块中查找它,因此显示消息:
pickle.PicklingError: Can't pickle : it's not found as main.stream_function
子进程试图将主模块中的函数解析为main.stream_function
,查找失败。
我的第一个建议是更改您的逻辑,以便将返回 stream_function
的方法传递给子进程。
video_process = Process(target=cap.get_stream_function, args=(...))
但您可能仍然会遇到问题,因为您在两个进程之间是sharing state。
当人们在 Python 中处理多处理范例时,我通常建议他们将进程视为运行在独立的机器中。在这些情况下,很明显您的架构存在问题。
我建议您将两个进程的职责分开,确保一个进程(子进程)处理整个视频捕获,而另一个进程(父进程或第三个进程)处理帧。
这种范例被称为Producer and Consumer Problem,它非常适合您的系统。视频捕获过程将是生产者,另一个是消费者。一个简单的multiprocessing.Pipe
或multiprocessing.Queue
将确保帧一旦准备好就从生产者传输到消费者。
添加一个伪代码示例,因为我不知道视频捕获 API。关键是在生产者过程中处理整个视频捕获逻辑,将其从消费者中抽象出来。消费者只需要知道它通过管道接收帧对象。
def capture_video(writer):
"""This runs in the producer process."""
# The VideoCapture class wraps the video acquisition logic
cap = VideoCapture()
while True:
frame = cap.get_next_frame() # the method returns the next frame
writer.send(frame) # send the new frame to the consumer process
def main():
reader, writer = multiprocessing.Pipe(False)
# producer process
video_process = Process(target=capture_video, args=[writer])
video_process.start() # Launch capture process
while True:
try:
frame = reader.recv() # receive next frame from the producer
process_frame(frame)
except KeyboardInterrupt:
video_process.terminate()
break
注意进程之间没有共享状态(无需共享任何数组)。通信通过管道并且是单向的,使得逻辑非常简单。正如我上面所说,这种逻辑也适用于不同的机器。您只需要用插座替换管道。
您可能希望为生产者进程提供更清晰的终止方法。我建议您使用multiprocessing.Event
。只需从 KeyboardInterrupt
块中的父级设置它,并在每次迭代时检查其在子级中的状态 (while not event.is_set()
)。
【讨论】:
编辑了一点答案,希望听起来更清楚。 感谢您的回答。但是为什么这在 Linux 上运行得很好呢? Apple fork manpage,请查看 CAVEATS 章节。不同操作系统实现进程创建和管理的方式存在核心差异。因此,如果可移植性是您的应用程序的一个重要方面,那么尽可能坚持已知的设计模式并保持简单是至关重要的。 也许有效,但工作缓慢。将非块流转换为图像,大多数 CMOS 传感器不允许特定读取。 主要是OP不需要跨多个进程共享视频捕获逻辑。 P&C 最适合这些类型的问题。它是标准的,可以很好地定义和隔离职责,并使逻辑非常简单。在这种情况下,内存共享完全是多余的,并且使逻辑错误且难以理解。因此出现了上述混淆。以上是关于如何在 OSX 上的单独进程中读取网络摄像头?的主要内容,如果未能解决你的问题,请参考以下文章
如何通过 Javascript 在 PythonAnywhere 上的 OpenCV 中访问网络摄像头?