在将结果附加到字典的 for 循环上使用 python 多处理
Posted
技术标签:
【中文标题】在将结果附加到字典的 for 循环上使用 python 多处理【英文标题】:Using python multiprocessing on a for loop that appends results to dictionary 【发布时间】:2020-05-20 02:54:15 【问题描述】:因此,我查看了多处理模块的文档以及此处提出的其他问题,但似乎没有一个与我的情况相似,因此我提出了一个新问题。
为简单起见,我有一段代码如下:
# simple dataframe of some users and their properties.
data = 'userId': [1, 2, 3, 4],
'property': [12, 11, 13, 43]
df = pd.DataFrame.from_dict(data)
# a function that generates permutations of the above users, in the form of a list of lists
# such as [[1,2,3,4], [2,1,3,4], [2,3,4,1], [2,4,1,3]]
user_perm = generate_permutations(nr_perm=4)
# a function that computes some relation between users
def comp_rel(df, permutation, user_dict):
df1 = df.userId.isin(permutation[0])
df2 = df.userId.isin(permutation[1])
user_dict[permutation[0]] += permutation[1]
return user_dict
# and finally a loop:
user_dict = defaultdict(int)
for permutation in user_perm:
user_dict = comp_rel(df, permutation, user_dict)
我知道这段代码现在没有什么意义(如果有的话),但我只是写了一个小例子,它与我正在处理的实际代码的结构很接近。 user_dict
最终应该包含 userIds
和一些值。
我有实际的代码,它工作正常,给出了正确的 dict 和所有内容,但是......它在单个线程上运行。而且它非常缓慢,请记住我还有另外 15 个线程完全免费。
我的问题是,如何使用 python 的 multiprocessing
模块来更改最后一个 for 循环,并能够在所有可用的线程/内核上运行?看了文档,不是很容易理解。
编辑:我正在尝试将池用作:
p = multiprocessing.Pool(multiprocessing.cpu_count())
p.map(comp_rel(df, permutation, user_dict), user_perm)
p.close()
p.join()
但是这会中断,因为我正在使用该行:
user_dict = comp_rel(df, permutation, user_dict)
在初始代码中,我不知道池完成后应该如何合并这些字典。
【问题讨论】:
你肯定需要了解GIL。 @OlvinRoght 我知道有一些锁,但这也说明:但是,一些扩展模块,无论是标准的还是第三方的,都被设计为在执行计算密集型任务时释放 GIL例如压缩或散列。多处理似乎就是这样一个模块。 @GPhilo,据我所知,我的机器有 4 个内核,每个内核有 4 个线程。如果我使用 htop,我会看到 16 个空闲的“线程”。我们说的是线程还是内核? 多处理模块中的示例展示了如何做到这一点:docs.python.org/3/library/… 您可以使用池来触发comp_rel
的每个调用。由于您要启动多个 python 进程,因此 GIL 不会成为问题。
【参考方案1】:
comp_rel
有两部分需要分开 - 首先是计算本身,它正在计算某些用户 ID 的一些值。第二个是“累积”步骤,该步骤将该值添加到user_dict
结果中。
您可以将计算本身分离,使其返回一个 (id, value)
的元组并通过多处理将其分流,然后在主线程中随后累积结果:
from multiprocessing import Pool
from functools import partial
from collections import defaultdict
# We make this a pure function that just returns a result instead of mutating anything
def comp_rel(df, perm):
...
return perm[0], perm[1]
comp_with_df = partial(comp_rel, df) # df is always the same, so factor it out
with Pool(None) as pool: # Pool(None) uses cpu_count automatically
results = pool.map(comp_with_df, user_perm)
# Now add up the results at the end:
user_dict = defaultdict(int)
for k, v in results:
user_dict[k] += v
或者,您也可以将Manager().dict()
对象直接传递给处理函数,但这有点复杂,并且可能不会为您带来任何额外的速度。
根据@Masklinn 的建议,这里有一个更好的方法来避免内存开销:
user_dict = defaultdict(int)
with Pool(None) as pool:
for k, v in pool.imap_unordered(comp_with_df, user_perm):
user_dict[k] += v
这样,我们在结果完成时将它们加起来,而不必先将它们全部存储在中间列表中。
【讨论】:
另外,由于结果的顺序似乎根本不重要,您可能想使用imap_unordered
,并在池中进行累积。这样您就可以在结果生成时使用它们,并且 python 不需要大的重新排序缓冲区来按顺序返回元素。
@Masklinn ProcessPoolExecutor
和 as_completed()
可能是一个更“好”的选项。
这似乎是一个低级的过程:使用 imap_unordered 有一次用户空间代码,使用 as_completed 你必须首先提交所有任务,可能跟踪返回的期货,然后处理您从 as_completed 获得的期货。
"实际上一共是两行代码。"这是 imap_unordered 的 LOC 的两倍。 “而且它不需要像 partitial() 这样的‘hacks’” partial 是一个从字面上没有意义的 hack。 “并且您可以在处理完成后立即使用结果”所以......就像 imap_unordered 一样,但仍然没有那么好?
@Quubix 这个答案的重点是不让user_dict
成为一个论点。您只返回每个单独计算的结果,然后在主线程中创建字典。【参考方案2】:
在 cmets 进行简短讨论后,我决定使用 ProcessPoolExecutor
发布解决方案:
import concurrent.futures
from collections import defaultdict
def comp_rel(df, perm):
...
return perm[0], perm[1]
user_dict = defaultdict(int)
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = executor.submit(comp_rel, df, perm): perm for perm in user_perm
for future in concurrent.futures.as_completed(futures):
try:
k, v = future.result()
except Exception as e:
print(f"futures[future] throws e")
else:
user_dict[k] += v
它与@tzaman 的工作方式相同,但它使您可以处理异常。此模块还有更多有趣的功能,请查看docs。
【讨论】:
throws comp_rel() 缺少 1 个必需的位置参数:'user_dict',有没有办法像原始代码一样在那里输入用户字典? @Quubix,引用自文档“如果max_workers
是 None
或未给出,它将默认为机器上的处理器数量。”
谢谢,但我的问题是 comp_rel 需要使用 df、perm 和 user_dict 参数。如何在 executor.submit 行中添加它?
@Quubix,只需再添加一个位置参数executor.submit(comp_rel, df, perm, user_dict)
。
@Quubix, future.result()
返回与comp_rel()
完全相同。在示例函数中返回 2 个值,这就是我将其解包 k, v = future.result()
的原因。如果你的函数有不同的回报 - 你应该修补代码以上是关于在将结果附加到字典的 for 循环上使用 python 多处理的主要内容,如果未能解决你的问题,请参考以下文章
将 for 循环的结果附加到 Laravel 中的 eloquent 输出