将多线程和多处理与 concurrent.futures 相结合

Posted

技术标签:

【中文标题】将多线程和多处理与 concurrent.futures 相结合【英文标题】:Combining multithreading and multiprocessing with concurrent.futures 【发布时间】:2020-11-28 02:11:10 【问题描述】:

我有一个高度依赖 I/O 和 CPU 密集型的函数。我试图通过多处理和多线程来并行化它,但它被卡住了。这个问题was asked 之前但在不同的设置中。我的函数是完全独立的,什么也不返回。为什么卡住了?怎么解决?

import concurrent.futures
import os
import numpy as np
import time


ids = [1,2,3,4,5,6,7,8]

def f(x):
    time.sleep(1)
    x**2

def multithread_accounts(AccountNumbers, f, n_threads = 2):

    slices = np.array_split(AccountNumbers, n_threads)
    slices = [list(i) for i in slices]

    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(f, slices)



def parallelize_distribute(AccountNumbers, f, n_threads = 2, n_processors = os.cpu_count()):

    slices = np.array_split(AccountNumbers, n_processors)
    slices = [list(i) for i in slices]

    with concurrent.futures.ProcessPoolExecutor(max_workers=n_processors) as executor:
        executor.map( lambda x: multithread_accounts(x, f, n_threads = n_threads) , slices)
        
parallelize_distribute(ids, f, n_processors=2, n_threads=2)

【问题讨论】:

【参考方案1】:

抱歉,我没时间解释这一切,所以我只给出“有效”的代码。我敦促您从更简单的东西开始,因为学习曲线并非易事。首先将 numpy 排除在外;一开始坚持only 线程;然后移动到 only 进程;除非您是专家,否则不要尝试并行化命名模块级函数以外的任何内容(不,不是函数局部匿名 lambda)。

正如经常发生的那样,您“应该”收到的错误消息会被抑制,因为它们是异步发生的,因此没有好的方法来报告它们。随意添加print() 语句,看看你能走多远。

注意:我去掉了 numpy,并添加了所需的东西,以便它也可以在 Windows 上运行。我希望使用 numpy 的 array_split() 可以正常工作,但我当时使用的机器上没有 numpy。

import concurrent.futures as cf
import os
import time

def array_split(xs, n):
    from itertools import islice
    it = iter(xs)
    result = []
    q, r = divmod(len(xs), n)
    for i in range(r):
        result.append(list(islice(it, q+1)))
    for i in range(n - r):
        result.append(list(islice(it, q)))
    return result
    
ids = range(1, 11)

def f(x):
    print(f"called with x")
    time.sleep(5)
    x**2

def multithread_accounts(AccountNumbers, f, n_threads=2):
    with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
        for slice in array_split(AccountNumbers, n_threads):
            executor.map(f, slice)

def parallelize_distribute(AccountNumbers, f, n_threads=2, n_processors=os.cpu_count()):
    slices = array_split(AccountNumbers, n_processors)
    print("top slices", slices)
    with cf.ProcessPoolExecutor(max_workers=n_processors) as executor:
        executor.map(multithread_accounts, slices,
                                           [f] * len(slices),
                                           [n_threads] * len(slices))

if __name__ == "__main__":
    parallelize_distribute(ids, f, n_processors=2, n_threads=2)

顺便说一句,我建议这对线程部分更有意义:

def multithread_accounts(AccountNumbers, f, n_threads=2):
    with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
        executor.map(f, AccountNumbers)

也就是说,这里真的没有必要自己拆分列表 - 线程机器会自己拆分它。您可能在最初的尝试中错过了这一点,因为您发布的代码中的 ThreadPoolExecutor() 调用忘记指定 max_workers 参数。

【讨论】:

以上是关于将多线程和多处理与 concurrent.futures 相结合的主要内容,如果未能解决你的问题,请参考以下文章

带返回值的线程创建方式

python concurrent.futures

Java 线程池抛 RejectedExecutionException 异常时 active threads 值分析

Java 线程池抛 RejectedExecutionException 异常时 active threads 值分析

Java线程池工具类

多线程下载图片