在这种情况下,为啥串行代码比 concurrent.futures 快?
Posted
技术标签:
【中文标题】在这种情况下,为啥串行代码比 concurrent.futures 快?【英文标题】:Why serial code is faster than concurrent.futures in this case?在这种情况下,为什么串行代码比 concurrent.futures 快? 【发布时间】:2022-01-10 12:16:33 【问题描述】:我正在使用以下代码为我的 ML 项目处理一些图片,我想将其并行化。
import multiprocessing as mp
import concurrent.futures
def track_ids(seq):
'''The func is so big I can not put it here'''
ood =
for i in seq:
# I load around 500 images and process them
ood[i] = some Value
return ood
seqs = []
for seq in range(1, 10):# len(seqs)+1):
seq = txt+str(seq)
seqs.append(seq)
# serial call of the function
track_ids(seq)
#parallel call of the function
with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as ex:
ood_id = ex.map(track_ids, seqs)
如果我串行运行代码需要3.0
分钟,但对于并发并行运行,需要3.5
分钟。
有人可以解释这是为什么吗?并提出解决问题的方法。
顺便说一句,我有 12 个内核。 谢谢
【问题讨论】:
很难说...代码的结构没有明显的问题。每当“速度”这个词从你嘴里说出来时,你就应该进行分析。不可否认,分析多进程代码比单进程更困难,但有很多工具可供使用。 【参考方案1】:下面是一个简要示例,说明如何分析多处理代码与串行执行:
from multiprocessing import Pool
from cProfile import Profile
from pstats import Stats
import concurrent.futures
def track_ids(seq):
'''The func is so big I can not put it here'''
ood =
for i in seq:
# I load around 500 images and process them
ood[i] = some Value
return ood
def profile_seq():
p = Profile() #one and only profiler instance
p.enable()
seqs = []
for seq in range(1, 10):# len(seqs)+1):
seq = txt+str(seq)
seqs.append(seq)
# serial call of the function
track_ids(seq)
p.disable()
return Stats(p), seqs
def track_ids_pr(seq):
p = Profile() #profile the child tasks
p.enable()
retval = track_ids(seq)
p.disable()
return (Stats(p, stream="dummy"), retval)
def profile_parallel():
p = Profile() #profile stuff in the main process
p.enable()
with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as ex:
retvals = ex.map(track_ids_pr, seqs)
p.disable()
s = Stats(p)
out = []
for ret in retvals:
s.add(ret[0])
out.append(ret[1])
return s, out
if __name__ == "__main__":
stat, retval = profile_parallel()
stat.print_stats()
编辑:不幸的是,我发现pstat.Stats
对象不能与multiprocessing.Queue
一起正常使用,因为它不可腌制(concurrent.futures
的操作需要它)。显然,它通常会存储对文件的引用,以便将统计信息写入该文件,如果没有给出,它将默认获取对 sys.stdout
的引用。然而,在我们真正想要打印统计数据之前,我们实际上并不需要该引用,因此我们可以给它一个临时值以防止 pickle 错误,然后稍后恢复一个适当的值。下面的示例应该可以复制粘贴并且运行良好,而不是上面的伪代码示例。
from multiprocessing import Queue, Process
from cProfile import Profile
from pstats import Stats
import sys
def isprime(x):
for d in range(2, int(x**.5)):
if x % d == 0:
return False
return True
def foo(retq):
p = Profile()
p.enable()
primes = []
max_n = 2**20
for n in range(3, max_n):
if isprime(n):
primes.append(n)
p.disable()
retq.put(Stats(p, stream="dummy")) #Dirty hack: set `stream` to something picklable then override later
if __name__ == "__main__":
q = Queue()
p1 = Process(target=foo, args=(q,))
p1.start()
p2 = Process(target=foo, args=(q,))
p2.start()
s1 = q.get()
s1.stream = sys.stdout #restore original file
s2 = q.get()
# s2.stream #if we are just adding this `Stats` object to another the `stream` just gets thrown away anyway.
s1.add(s2) #add up the stats from both child processes.
s1.print_stats() #s1.stream gets used here, but not before. If you provide a file to write to instead of sys.stdout, it will write to that file)
p1.join()
p2.join()
【讨论】:
分析的目标是希望确定代码中的瓶颈所在。对我来说,您可能已经将存储所有图像的硬盘驱动器最大化,在这种情况下,您只是在已经受限的系统中增加了开销。剖析将帮助您确定但滞留在哪里 我在分析时收到以下错误,来自for ret in retvals: etc.
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
@Dariyoush 我不测试不好。不幸的是,pstat.Stats
通常不可提取,这是从子进程发送回结果所必需的。典型的解决方案(据我从分析文档中可以理解)是将统计信息从子进程写入文件,然后重新打开这些文件以将它们加载到主进程中......或者,我们可以给出Stats
构造函数是一个可提取的假文件流,因为它实际上并不需要它。以上是关于在这种情况下,为啥串行代码比 concurrent.futures 快?的主要内容,如果未能解决你的问题,请参考以下文章
为啥我使用 openMP atomic 的并行代码比串行代码花费更长的时间?
Python 没有接收到来自 Arduino Mega 2560 的第一行串行数据,而是接收到所有后续数据,为啥会发生这种情况?