将多线程和多处理与 concurrent.futures 相结合
Posted
技术标签:
【中文标题】将多线程和多处理与 concurrent.futures 相结合【英文标题】:Combining multithreading and multiprocessing with concurrent.futures 【发布时间】:2020-11-28 02:11:10 【问题描述】:我有一个高度依赖 I/O 和 CPU 密集型的函数。我试图通过多处理和多线程来并行化它,但它被卡住了。这个问题was asked 之前但在不同的设置中。我的函数是完全独立的,什么也不返回。为什么卡住了?怎么解决?
import concurrent.futures
import os
import numpy as np
import time
ids = [1,2,3,4,5,6,7,8]
def f(x):
time.sleep(1)
x**2
def multithread_accounts(AccountNumbers, f, n_threads = 2):
slices = np.array_split(AccountNumbers, n_threads)
slices = [list(i) for i in slices]
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(f, slices)
def parallelize_distribute(AccountNumbers, f, n_threads = 2, n_processors = os.cpu_count()):
slices = np.array_split(AccountNumbers, n_processors)
slices = [list(i) for i in slices]
with concurrent.futures.ProcessPoolExecutor(max_workers=n_processors) as executor:
executor.map( lambda x: multithread_accounts(x, f, n_threads = n_threads) , slices)
parallelize_distribute(ids, f, n_processors=2, n_threads=2)
【问题讨论】:
【参考方案1】:抱歉,我没时间解释这一切,所以我只给出“有效”的代码。我敦促您从更简单的东西开始,因为学习曲线并非易事。首先将 numpy 排除在外;一开始坚持only 线程;然后移动到 only 进程;除非您是专家,否则不要尝试并行化命名模块级函数以外的任何内容(不,不是函数局部匿名 lambda)。
正如经常发生的那样,您“应该”收到的错误消息会被抑制,因为它们是异步发生的,因此没有好的方法来报告它们。随意添加print()
语句,看看你能走多远。
注意:我去掉了 numpy,并添加了所需的东西,以便它也可以在 Windows 上运行。我希望使用 numpy 的 array_split()
可以正常工作,但我当时使用的机器上没有 numpy。
import concurrent.futures as cf
import os
import time
def array_split(xs, n):
from itertools import islice
it = iter(xs)
result = []
q, r = divmod(len(xs), n)
for i in range(r):
result.append(list(islice(it, q+1)))
for i in range(n - r):
result.append(list(islice(it, q)))
return result
ids = range(1, 11)
def f(x):
print(f"called with x")
time.sleep(5)
x**2
def multithread_accounts(AccountNumbers, f, n_threads=2):
with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
for slice in array_split(AccountNumbers, n_threads):
executor.map(f, slice)
def parallelize_distribute(AccountNumbers, f, n_threads=2, n_processors=os.cpu_count()):
slices = array_split(AccountNumbers, n_processors)
print("top slices", slices)
with cf.ProcessPoolExecutor(max_workers=n_processors) as executor:
executor.map(multithread_accounts, slices,
[f] * len(slices),
[n_threads] * len(slices))
if __name__ == "__main__":
parallelize_distribute(ids, f, n_processors=2, n_threads=2)
顺便说一句,我建议这对线程部分更有意义:
def multithread_accounts(AccountNumbers, f, n_threads=2):
with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
executor.map(f, AccountNumbers)
也就是说,这里真的没有必要自己拆分列表 - 线程机器会自己拆分它。您可能在最初的尝试中错过了这一点,因为您发布的代码中的 ThreadPoolExecutor()
调用忘记指定 max_workers
参数。
【讨论】:
以上是关于将多线程和多处理与 concurrent.futures 相结合的主要内容,如果未能解决你的问题,请参考以下文章
Java 线程池抛 RejectedExecutionException 异常时 active threads 值分析