Python 多进程读取输入并调用昂贵的模型
Posted
技术标签:
【中文标题】Python 多进程读取输入并调用昂贵的模型【英文标题】:Python multiple processes to read input and call an expensive model 【发布时间】:2021-12-15 16:44:00 【问题描述】:我有一个具有 2 个功能的设置,像这样。
def fun1(input_stream, output_stream):
batch_data = []
#read input line by line and construct a batch of size batch_size
for line in input_stream:
batch_data.append(process(line))
if len(batch_data) == batch_size:
batch_results = fun2(batch_data)
#write results line by line to output stream
batch_data = []
def fun2(batch_data):
# call an expensive model and return the response
return process(expensive_call(batch_data))
在设置中,外部调用者调用fun1
。 fun2
正在等待从 fun1
获取批次,当模型被调用时,fun1
正在等待。
我的第一个直觉是看看我们是否可以使用多处理将fun1
和fun2
分成两个进程。 fun1
不断写入最大大小的队列(例如,batch_size * 5
),每当 fun2
空闲时,它会处理队列中可用的任何内容(如果有完整批次或更多可用,则读取一个批次。否则,读取任何可用的内容。)
我在 python 方面经验丰富,但从未使用过多处理/多线程。在 python 中执行此操作的最佳方法是什么?使用多处理/多线程会更好,有什么区别?
另外,异步写入output_stream
是个好主意吗?
还有其他方法可以加快速度吗?
【问题讨论】:
【参考方案1】:我会将函数 func
变成一个生成器函数,yields 它的批次,并且可以用作 iterable 与 multiprocessing.Pool.imap
或 @ 一起使用multiprocessing.Pool
的 987654323@ 方法(请参阅代码 cmets 了解区别)。与 map
相比,这些方法允许您在最终结果可用时对其进行处理,map
在处理完所有批次之前不会返回。
from multiprocessing import Pool
def fun1(input_stream, output_stream):
batch_data = []
#read input line by line and construct a batch of size batch_size
for line in input_stream:
batch_data.append(process_line(line))
if len(batch_data) == batch_size:
yield batch_data
batch_data = []
# The possibility exists (no?) that input is not a multiple of batch_size, so:
if batch_data:
yield batch_data
def fun2(batch_data):
# call an expensive model and return the response
return process(expensive_call(batch_data))
def main():
pool = Pool()
# The iterable, i.e. the fun1 generator function can be lazily evalulated:
results = pool.imap(fun2, fun1(input_stream, output_stream))
# Iterate the results from fun2 as they become available.
# Substitute pool.imap_unordered for pool.imap if you are willing to have
# the results returned in completion order rather than task-submission order.
# imap_unordered can be slightly more efficient.
for result in results:
... # do something with the return value from
# Required for Windows:
if __name__ == '__main__':
main()
【讨论】:
以上是关于Python 多进程读取输入并调用昂贵的模型的主要内容,如果未能解决你的问题,请参考以下文章