在 Python 中分析来自连续输入流(记录)的数据,多处理?

Posted

技术标签:

【中文标题】在 Python 中分析来自连续输入流(记录)的数据,多处理?【英文标题】:Analyzing data from continuous input stream (recording) in Python, multiprocessing? 【发布时间】:2016-08-16 17:22:50 【问题描述】:

我正在实时分析通过麦克风录制的数据。到目前为止,我一直以线性方式执行此操作:

录制一秒(需要 1 秒) 分析数据(例如 50 毫秒) 录制一秒 分析数据

等等。这显然意味着,当我分析过去一秒的数据时,我失去了这 50 毫秒的时间,我不会在此期间记录声音。

我认为多处理将是解决方案:我启动一个单独的进程,以特定长度的块不间断地记录,并且每次都通过管道将其发送到主进程,然后主进程分析数据。不幸的是,通过管道发送大量数据(或者通常,将大量数据从一个进程发送到另一个进程)显然远非理想。有没有其他方法可以做到这一点?我只希望我的计算机记录数据并将其导入 python(我已经在做所有这些),同时它也在分析数据。

如果我需要添加更多详细信息,请告诉我!

谢谢!

【问题讨论】:

你的一些实际代码会很有用。 【参考方案1】:

简单的生产者/消费者实现。

虽然确实来回移动数据会导致开销并增加内存使用,但只要多个进程不需要相同的数据,开销就会最小。试试看:) 可以通过更改队列和池大小数字来调整内存占用。

线程是减少内存使用的另一种选择,但代价是在 GIL 上被阻塞并且如果处理是在 python 字节码中有效地单线程。

import multiprocessing
# Some fixed size to avoid run away memory use
recorded_data = multiprocessing.Queue(100)

def process(recorded_data):
    while True:
        data = recorded_data.get()
        <process data>

def record(recorded_data):
    for data in input_stream:
        recorded_data.put(data)

producer = multiprocessing.Process(target=record, 
                                   args=(recorded_data,))
producer.start()

# Pool of 10 processes
num_proc = 10
consumer_pool = multiprocessing.Pool(num_proc)
results = []
for _ in xrange(num_proc):
    results.append(
      consumer_pool.apply_async(process,
                                args=(recorded_data,)))

producer.join()

# If processing actually returns something
for result in results:
    print result
# Consumers wait for data from queue forever
#  so terminate them when done
consumer_pool.terminate()

【讨论】:

谢谢!我尝试了类似的方法,但以 process() 作为主要进程。由于某种原因,将数据从 record() 进程推送到主进程很慢(我使用的是管道,因为我显然只有一个 process() 进程要发送到。管道会不会是个问题?) .我将再次考虑您的实施情况,看看解决问题的方法。 :) 是的,multiprocess.pipe 会比使用共享队列慢。使用管道,对象每次都需要序列化和反序列化,并通过网络接口传递,这有其自身的开销。序列化仍然需要使用队列进行,这就是为什么不能将原生对象放在 multiprocessing.Queue 上(但可以使用 threading 模块)但不存在网络开销。

以上是关于在 Python 中分析来自连续输入流(记录)的数据,多处理?的主要内容,如果未能解决你的问题,请参考以下文章

从FFmpeg输出日志中分析问题原因——记一次输出流顺序异常

从FFmpeg输出日志中分析问题原因——记一次输出流顺序异常

从FFmpeg输出日志中分析问题原因——记一次输出流顺序异常

从FFmpeg输出日志中分析问题原因——记一次输出流顺序异常

从FFmpeg输出日志中分析问题原因——记一次输出流顺序异常

在 python 中分析自我和参数?