在 Python 中分析来自连续输入流(记录)的数据,多处理?
Posted
技术标签:
【中文标题】在 Python 中分析来自连续输入流(记录)的数据,多处理?【英文标题】:Analyzing data from continuous input stream (recording) in Python, multiprocessing? 【发布时间】:2016-08-16 17:22:50 【问题描述】:我正在实时分析通过麦克风录制的数据。到目前为止,我一直以线性方式执行此操作:
录制一秒(需要 1 秒) 分析数据(例如 50 毫秒) 录制一秒 分析数据等等。这显然意味着,当我分析过去一秒的数据时,我失去了这 50 毫秒的时间,我不会在此期间记录声音。
我认为多处理将是解决方案:我启动一个单独的进程,以特定长度的块不间断地记录,并且每次都通过管道将其发送到主进程,然后主进程分析数据。不幸的是,通过管道发送大量数据(或者通常,将大量数据从一个进程发送到另一个进程)显然远非理想。有没有其他方法可以做到这一点?我只希望我的计算机记录数据并将其导入 python(我已经在做所有这些),同时它也在分析数据。
如果我需要添加更多详细信息,请告诉我!
谢谢!
【问题讨论】:
你的一些实际代码会很有用。 【参考方案1】:简单的生产者/消费者实现。
虽然确实来回移动数据会导致开销并增加内存使用,但只要多个进程不需要相同的数据,开销就会最小。试试看:) 可以通过更改队列和池大小数字来调整内存占用。
线程是减少内存使用的另一种选择,但代价是在 GIL 上被阻塞并且如果处理是在 python 字节码中有效地单线程。
import multiprocessing
# Some fixed size to avoid run away memory use
recorded_data = multiprocessing.Queue(100)
def process(recorded_data):
while True:
data = recorded_data.get()
<process data>
def record(recorded_data):
for data in input_stream:
recorded_data.put(data)
producer = multiprocessing.Process(target=record,
args=(recorded_data,))
producer.start()
# Pool of 10 processes
num_proc = 10
consumer_pool = multiprocessing.Pool(num_proc)
results = []
for _ in xrange(num_proc):
results.append(
consumer_pool.apply_async(process,
args=(recorded_data,)))
producer.join()
# If processing actually returns something
for result in results:
print result
# Consumers wait for data from queue forever
# so terminate them when done
consumer_pool.terminate()
【讨论】:
谢谢!我尝试了类似的方法,但以 process() 作为主要进程。由于某种原因,将数据从 record() 进程推送到主进程很慢(我使用的是管道,因为我显然只有一个 process() 进程要发送到。管道会不会是个问题?) .我将再次考虑您的实施情况,看看解决问题的方法。 :) 是的,multiprocess.pipe 会比使用共享队列慢。使用管道,对象每次都需要序列化和反序列化,并通过网络接口传递,这有其自身的开销。序列化仍然需要使用队列进行,这就是为什么不能将原生对象放在 multiprocessing.Queue 上(但可以使用 threading 模块)但不存在网络开销。以上是关于在 Python 中分析来自连续输入流(记录)的数据,多处理?的主要内容,如果未能解决你的问题,请参考以下文章
从FFmpeg输出日志中分析问题原因——记一次输出流顺序异常
从FFmpeg输出日志中分析问题原因——记一次输出流顺序异常
从FFmpeg输出日志中分析问题原因——记一次输出流顺序异常
从FFmpeg输出日志中分析问题原因——记一次输出流顺序异常