以追加到列表为结果的多处理 for 循环

Posted

技术标签:

【中文标题】以追加到列表为结果的多处理 for 循环【英文标题】:Multiprocessing for-loop with append-to-list as result 【发布时间】:2019-11-09 13:41:48 【问题描述】:

我需要并行化一个 for 循环。我当前的代码是循环遍历我从 xarray 数据集获取的 id 列表,从 xarray 数据集中获取具有相应 id 的行数据,调用函数(计算数据的三角分布),附加结果分布函数转换为列表,一旦完成,它将列表转换为 xarray 数据集,其中每个结果都链接到当前的 id,因此稍后可以通过 ID 将这个数据集附加到“主”数据集。

我的代码看起来有点像这样:

from sklearn.preprocessing import MinMaxScaler
import xarray as xr
import scipy.stats as st

function call_func(data):
   scaler = MinMaxScaler()
   norm_data = scaler.fit_transform(np.reshape(data, (len(data),1)))
   params = st.triang.fit(norm_data)
   arg,loc,scale = params[:-2],params[-2],params[-1]
   dist = st.triang(loc=loc, scale=scale, *arg)
   return dist

if __name__ == "__main__":
for id in my_dataset['id'].values:
        row_data= my_dataset.sel(id=id)['data'].values[0]
        if len(row_data)>3 and all(row_data== 0) == False:
                result = call_func(row_data)
                result_list.append(result)
        else:
            result_list.append([])

new_dataset = xr.Dataset('id': my_dataset['id'].values,
                          'dist_data':(['id','dist'],
                           np.reshape(np.array(result_list),(len(result_list),1)))
                           )

由于 id_array 很大,我想并行化循环。这是一个通用问题,但是我是多处理工具的新手。您对如何将多处理与此任务结合起来有什么建议吗?我的研究表明,多处理和附加到列表并不是最明智的做法。

【问题讨论】:

【参考方案1】:

我将尝试给出一个简单的虚拟示例,希望您可以推断出您的代码所需的修改:

这是一个常规循环版本的代码:

id_array = [*range(10)]

result = []
for id in id_array:
    if id % 2 == 0:
        result.append((id, id))
    else:
        result.append((id, id ** 2))

print(result)

输出:

[(0, 0), (1, 1), (2, 2), (3, 9), (4, 4), (5, 25), (6, 6), (7, 49 ), (8, 8), (9, 81)]


在这里,使用ProcessPoolExecutor,我将它并行化到 4 个进程:

from concurrent.futures import ProcessPoolExecutor

id_array = [*range(10)]


def myfunc(id):
    if id % 2 == 0:
        return id, id
    else:
        return id, id ** 2


result = []
with ProcessPoolExecutor(max_workers=4) as executor:
    for r in executor.map(myfunc, id_array):
        result.append(r)

print(result)

输出(同):

[(0, 0), (1, 1), (2, 2), (3, 9), (4, 4), (5, 25), (6, 6), (7, 49 ), (8, 8), (9, 81)]


基本上:

    for 内容提取到返回所需值的函数中 使用ProcessPoolExecutorexecutor.map(myfunc, id_array) 将返回的值附加到您的结果列表中。

【讨论】:

感谢您的建议。生病尝试实施它。但是,在 jupyter 中尝试您的代码时,它以以下错误消息结束:BrokenProcessPool:进程池中的进程在未来运行或挂起时突然终止。 也许这个关于在交互式 shell 中运行 ProcessPoolExecutor 的答案可以提供帮助:***.com/a/15918742/5052365

以上是关于以追加到列表为结果的多处理 for 循环的主要内容,如果未能解决你的问题,请参考以下文章

带计数器的多处理嵌套 for 循环

列表推导式和for循环执行效率对比

在 Python For 循环中追加和弹出项目 [重复]

python 利用 for ... else 跳出双层嵌套循环

python 利用 for ... else 跳出双层嵌套循环

为啥我的 for 循环覆盖而不是追加?