Python 多进程读取输入并调用昂贵的模型

Posted

技术标签:

【中文标题】Python 多进程读取输入并调用昂贵的模型【英文标题】:Python multiple processes to read input and call an expensive model 【发布时间】:2021-12-15 16:44:00 【问题描述】:

我有一个具有 2 个功能的设置,像这样。

def fun1(input_stream, output_stream):
    batch_data = []
    #read input line by line and construct a batch of size batch_size
    for line in input_stream:
        batch_data.append(process(line))
        if len(batch_data) == batch_size:
            batch_results = fun2(batch_data)
            #write results line by line to output stream
        batch_data = []

def fun2(batch_data):
    # call an expensive model and return the response
    return process(expensive_call(batch_data))

在设置中,外部调用者调用fun1fun2 正在等待从 fun1 获取批次,当模型被调用时,fun1 正在等待。

我的第一个直觉是看看我们是否可以使用多处理将fun1fun2 分成两个进程。 fun1 不断写入最大大小的队列(例如,batch_size * 5),每当 fun2 空闲时,它会处理队列中可用的任何内容(如果有完整批次或更多可用,则读取一个批次。否则,读取任何可用的内容。)

我在 python 方面经验丰富,但从未使用过多处理/多线程。在 python 中执行此操作的最佳方法是什么?使用多处理/多线程会更好,有什么区别? 另外,异步写入output_stream 是个好主意吗?

还有其他方法可以加快速度吗?

【问题讨论】:

【参考方案1】:

我会将函数 func 变成一个生成器函数,yields 它的批次,并且可以用作 iterablemultiprocessing.Pool.imap 或 @ 一起使用multiprocessing.Pool 的 987654323@ 方法(请参阅代码 cmets 了解区别)。与 map 相比,这些方法允许您在最终结果可用时对其进行处理,map 在处理完所有批次之前不会返回。

from multiprocessing import Pool

def fun1(input_stream, output_stream):
    batch_data = []
    #read input line by line and construct a batch of size batch_size
    for line in input_stream:
        batch_data.append(process_line(line))
        if len(batch_data) == batch_size:
            yield batch_data
        batch_data = []
    # The possibility exists (no?) that input is not a multiple of batch_size, so:
    if batch_data:
        yield batch_data

def fun2(batch_data):
    # call an expensive model and return the response
    return process(expensive_call(batch_data))

def main():
    pool = Pool()
    # The iterable, i.e. the fun1 generator function can be lazily evalulated:
    results = pool.imap(fun2, fun1(input_stream, output_stream))
    # Iterate the results from fun2 as they become available.
    # Substitute pool.imap_unordered for pool.imap if you are willing to have
    # the results returned in completion order rather than task-submission order.
    # imap_unordered can be slightly more efficient.
    for result in results:
        ... # do something with the return value from 
    
# Required for Windows:
if __name__ == '__main__':
    main()

【讨论】:

以上是关于Python 多进程读取输入并调用昂贵的模型的主要内容,如果未能解决你的问题,请参考以下文章

python 多进程

Python多进程 - subprocess & multiprocess

Python第四周之多线程和多进程

python 多线程

python -- 多进程

Linux系统中多线程实现方法的全面解析