在这种情况下,为啥串行代码比 concurrent.futures 快?

Posted

技术标签:

【中文标题】在这种情况下,为啥串行代码比 concurrent.futures 快?【英文标题】:Why serial code is faster than concurrent.futures in this case?在这种情况下,为什么串行代码比 concurrent.futures 快? 【发布时间】:2022-01-10 12:16:33 【问题描述】:

我正在使用以下代码为我的 ML 项目处理一些图片,我想将其并行化。

import multiprocessing as mp
import concurrent.futures

def track_ids(seq):
    '''The func is so big I can not put it here'''
    ood = 
    for i in seq:
        # I load around 500 images and process them
        ood[i] = some Value
    return ood

seqs = []
for seq in range(1, 10):# len(seqs)+1):
    seq = txt+str(seq)
    seqs.append(seq)
    # serial call of the function
    track_ids(seq)

#parallel call of the function
with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as ex:
    ood_id = ex.map(track_ids, seqs)

如果我串行运行代码需要3.0 分钟,但对于并发并行运行,需要3.5 分钟。 有人可以解释这是为什么吗?并提出解决问题的方法。

顺便说一句,我有 12 个内核。 谢谢

【问题讨论】:

很难说...代码的结构没有明显的问题。每当“速度”这个词从你嘴里说出来时,你就应该进行分析。不可否认,分析多进程代码比单进程更困难,但有很多工具可供使用。 【参考方案1】:

下面是一个简要示例,说明如何分析多处理代码与串行执行:

from multiprocessing import Pool
from cProfile import Profile
from pstats import Stats
import concurrent.futures

def track_ids(seq):
    '''The func is so big I can not put it here'''
    ood = 
    for i in seq:
        # I load around 500 images and process them
        ood[i] = some Value
    return ood

def profile_seq():
    p = Profile() #one and only profiler instance
    p.enable()
    seqs = []
    for seq in range(1, 10):# len(seqs)+1):
        seq = txt+str(seq)
        seqs.append(seq)
        # serial call of the function
        track_ids(seq)
    p.disable()
    return Stats(p), seqs


def track_ids_pr(seq):
    p = Profile() #profile the child tasks
    p.enable()
    
    retval = track_ids(seq)
    
    p.disable()
    return (Stats(p, stream="dummy"), retval)
    
def profile_parallel():
    p = Profile() #profile stuff in the main process
    p.enable()
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count()) as ex:
        retvals = ex.map(track_ids_pr, seqs)
        
    p.disable()
    s = Stats(p)
    
    out = []
    for ret in retvals:
        s.add(ret[0])
        out.append(ret[1])
        
    return s, out


if __name__ == "__main__":
    stat, retval = profile_parallel()
    stat.print_stats()

编辑:不幸的是,我发现pstat.Stats 对象不能与multiprocessing.Queue 一起正常使用,因为它不可腌制(concurrent.futures 的操作需要它)。显然,它通常会存储对文件的引用,以便将统计信息写入该文件,如果没有给出,它将默认获取对 sys.stdout 的引用。然而,在我们真正想要打印统计数据之前,我们实际上并不需要该引用,因此我们可以给它一个临时值以防止 pickle 错误,然后稍后恢复一个适当的值。下面的示例应该可以复制粘贴并且运行良好,而不是上面的伪代码示例。

from multiprocessing import Queue, Process
from cProfile import Profile
from pstats import Stats
import sys

def isprime(x):
    for d in range(2, int(x**.5)):
        if x % d == 0:
            return False
    return True

def foo(retq):
    p = Profile()
    p.enable()
    
    primes = []
    max_n = 2**20
    for n in range(3, max_n):
        if isprime(n):
            primes.append(n)
        
    p.disable()
    retq.put(Stats(p, stream="dummy")) #Dirty hack: set `stream` to something picklable then override later

if __name__ == "__main__":
    q = Queue()
    
    p1 = Process(target=foo, args=(q,))
    p1.start()
    
    p2 = Process(target=foo, args=(q,))
    p2.start()
    
    s1 = q.get()
    s1.stream = sys.stdout #restore original file
    s2 = q.get()
  # s2.stream #if we are just adding this `Stats` object to another the `stream` just gets thrown away anyway.
    
    s1.add(s2) #add up the stats from both child processes.
    s1.print_stats() #s1.stream gets used here, but not before. If you provide a file to write to instead of sys.stdout, it will write to that file)
    
    p1.join()
    p2.join()

【讨论】:

分析的目标是希望确定代码中的瓶颈所在。对我来说,您可能已经将存储所有图像的硬盘驱动器最大化,在这种情况下,您只是在已经受限的系统中增加了开销。剖析将帮助您确定但滞留在哪里 我在分析时收到以下错误,来自for ret in retvals: etc.concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending. @Dariyoush 我不测试不好。不幸的是,pstat.Stats 通常不可提取,这是从子进程发送回结果所必需的。典型的解决方案(据我从分析文档中可以理解)是将统计信息从子进程写入文件,然后重新打开这些文件以将它们加载到主进程中......或者,我们可以给出Stats 构造函数是一个可提取的假文件流,因为它实际上并不需要它。

以上是关于在这种情况下,为啥串行代码比 concurrent.futures 快?的主要内容,如果未能解决你的问题,请参考以下文章

在这种特殊情况下,为啥 gccgo 比 gc 慢?

为啥我使用 openMP atomic 的并行代码比串行代码花费更长的时间?

Python 没有接收到来自 Arduino Mega 2560 的第一行串行数据,而是接收到所有后续数据,为啥会发生这种情况?

为啥 python gevent 比串行慢?

为啥在这种情况下会生成不同的 go-assembler 代码?

为啥 selectAnnotation 在这种情况下不起作用?