python concurrent.futures.ProcessPoolExecutor:.submit() 与 .map() 的性能

Posted

技术标签:

【中文标题】python concurrent.futures.ProcessPoolExecutor:.submit() 与 .map() 的性能【英文标题】:python concurrent.futures.ProcessPoolExecutor: Performance of .submit() vs .map() 【发布时间】:2017-06-23 18:59:23 【问题描述】:

我正在使用concurrent.futures.ProcessPoolExecutor 从数字范围中查找数字的出现。目的是调查从并发中获得的加速性能量。为了对性能进行基准测试,我有一个控件 - 一个执行所述任务的串行代码(如下所示)。我编写了 2 个并发代码,一个使用 concurrent.futures.ProcessPoolExecutor.submit(),另一个使用 concurrent.futures.ProcessPoolExecutor.map() 来执行相同的任务。它们如下所示。关于起草前者和后者的建议可以分别看到here 和here。

发给所有三个代码的任务是在 0 到 1E8 的数字范围内查找数字 5 的出现次数。 .submit().map() 都分配了 6 个 worker,.map() 的块大小为 10,000。在并发代码中离散化工作负载的方式是相同的。但是,用于在两个代码中查找出现的函数是不同的。这是因为参数传递给.submit().map() 调用的函数的方式不同。

所有 3 个代码都报告了相同的出现次数,即 56,953,279 次。但是,完成任务所需的时间却大不相同。 .submit() 的执行速度比控件快 2 倍,而 .map() 完成任务所需的时间是控件的两倍。

问题:

    我想知道.map()的缓慢性能是我的编码造成的,还是天生就慢?”如果是前者,我该如何改进它。我很惊讶它的性能比控件慢因为没有太多使用它的动力。 我想知道是否有办法让.submit() 代码执行得更快。我的一个条件是函数 _concurrent_submit() 必须返回一个包含数字 5 的数字/出现次数的可迭代对象。

基准测试结果

concurrent.futures.ProcessPoolExecutor.submit()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from traceback import print_exc

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found 0 in 1:.4fsec".format(len(match),end))
    return match

def _concurrent_submit(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
        # 2.2. Instruct workers to process results as they come, when all are
        #      completed or .....
        cf.as_completed(futures) # faster than cf.wait()
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future.result():
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent_submit():')
        print("found 0 in 1:.4fsec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_submit(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found 0 in 1:.4fsec".format(len(a),end))

concurrent.futures.ProcessPoolExecutor.map()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc

def _findmatch(listnumber, number):    
    '''Function to find the occurrence of number in another number and return
       a string value.'''
    #print('def _findmatch(listnumber, number):')
    #print('listnumber = 0 and ref = 1'.format(listnumber, number))
    if number in str(listnumber):
        x = listnumber
        #print('x = 0'.format(x))
        return x 

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            numberlist = range(cstart, cstop)
            futures.append(executor.map(_findmatch, numberlist,
                                        itertools.repeat(number),
                                        chunksize=10000))
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found 0 in 1:.4fsec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found 0 in 1:.4fsec".format(len(a),end))

序列号:

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

from time import time

def _serial(nmax, number):    
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end=time()-start
    print("found 0 in 1:.4fsec".format(len(match),end))
    return match

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.

    start = time()
    a = _serial(nmax, number)
    end = time() - start
    print('\n main')
    print("found 0 in 1:.4fsec".format(len(a),end))

2017 年 2 月 13 日更新:

除了@niemmi 的回答,我还提供了一些个人研究后的回答:

    如何进一步加速 @niemmi 的 .map().submit() 解决方案,以及 当ProcessPoolExecutor.map() 可以比ProcessPoolExecutor.submit() 带来更多的加速时。

【问题讨论】:

【参考方案1】:

您在这里将苹果与橙子进行比较。使用map 时,您会生成所有1E8 数字并将它们传输到工作进程。与实际执行相比,这需要大量时间。使用 submit 时,您只需创建 6 组要传输的参数。

如果您更改map 以使用相同的原理进行操作,您将获得彼此接近的数字:

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found 0 in 1:.4fsec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunk * i for i in range(workers))
        cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
        futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))

        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found 0 in 1:.4fsec".format(foundsize, end))
    return found

正确使用as_completed可以提高提交的性能。对于给定的期货迭代,它将返回一个迭代器,该迭代器将按照它们完成的顺序 yield 期货。

您也可以跳过将数据复制到另一个数组并使用itertools.chain.from_iterable 将来自期货的结果组合成单个可迭代对象:

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found 0 in 1:.4fsec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))

    return chain.from_iterable(f.result() for f in cf.as_completed(futures))

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found 0 in 1:.4fsec".format(sum(1 for x in a),end))

【讨论】:

我刚刚研究了您的.map() 解决方案。哇..您重写cstartcstop 以将其应用于_findmatch().map() 的方式非常巧妙。我没想到我可以这样做。第一次使用.map()。这就是为什么.map() 代码中的_findmatch 与.submit() 代码和控制代码中的不同,这导致了苹果与橙色的比较。 ;) 我试图在 .map() 中包含 chunksize,但发现它会导致性能变慢。 chunksize 越大,.map 代码执行的速度就越慢。你能帮我理解为什么会这样吗? @SunBear 如果您使用我的地图版本,应该有简单的解释。假设您的机器上有 2 个内核,这意味着如果您正确地并行化工作,它可以在一半的时间内完成。现在地图实现将工作分为 6 个部分。假设您定义了chunksize=5,其中一名工作人员获得了 6 个部分中的 5 个,导致 5/6 的工作在其中一个核心上处理。一般来说,使用更大的块大小是有意义的,但前提是它允许工作在工作人员之间平均分配。尝试使用您原来的 submit 降低块大小,您应该会看到它变慢了。 我遵循您的推理直到 5/6 的工作正在其中一个核心上处理。 ' 当 chunksize= 10 时会发生什么?这是否意味着所有 6 人进入 1 名工人而其他工人处于空闲状态?额外的块大小是什么意思?对不起,我在这里有点慢。顺便说一句,在弄清楚你的 chunksize 和 .map() chunksize 如何影响计算速度时,我发现了一些有趣的东西。请参阅我对您的附加答案。我认为交互导致块数/工人数 我比较了 .submit() 代码。使用 6 名工作人员和 5 次运行,您的代码的平均计算时间比我的问题中发布的 .submit() 代码的平均计算时间快约 1.4 倍。您的代码的平均时间是 6.41 秒。哇..太棒了!将 .submit() 代码和 .map() .code 与我建议的更改进行比较,.submit() 代码仍然更快。 @SunBear 请注意,在我的解决方案中,从 iterable 产生的数字没有排序。节省时间的原因是没有将数字复制到主进程中的列表,也不需要等待包含数字 5xxxxxxx 的块完成,然后再使用后面的结果。稍后我将尝试根据 cmets 和您的答案扩展我的答案。【参考方案2】:

概述:

我的回答分为两部分:

第 1 部分展示了如何从 @niemmi 的 ProcessPoolExecutor.map() 解决方案中获得更多的加速。 第 2 部分展示了 ProcessPoolExecutor 的子类 .submit().map() 何时产生不等价的计算时间。

============================================== ============================

第 1 部分:ProcessPoolExecutor.map() 的更多加速​​

背景: 本部分建立在@niemmi 的.map() 解决方案之上,该解决方案本身就非常出色。在对他的离散化方案进行一些研究以更好地理解它如何与 .map() 块大小争论交互时,我发现了这个有趣的解决方案。

我认为@niemmi 对chunk = nmax // workers 的定义是对块大小的定义,即工作池中每个工作人员要处理的实际数字范围(给定任务)的较小大小。现在,这个定义的前提是假设一台计算机有 x 个工人,在每个工人之间平均分配任务将导致每个工人的最佳使用,因此总任务将最快完成。因此,将给定任务分解成的块数应始终等于池工作人员的数量。然而,这个假设正确吗?

主张:在这里,我提出上述假设在与ProcessPoolExecutor.map() 一起使用时并不总是导致最快的计算时间。相反,将任务离散化到大于池工作人员数量的数量可以导致加速,即更快地完成给定任务

实验:我修改了@niemmi 的代码,允许离散化任务的数量超过池工作人员的数量。该代码如下所示,用于计算数字 5 在 0 到 1E8 的数字范围内出现的次数。我已经使用 1、2、4 和 6 个池工作人员执行了这段代码,并针对离散任务数量与池工作人员数量的不同比率。对于每个场景,进行 3 次运行并将计算时间制成表格。 “加速”在这里被定义为当离散任务的数量大于池工作者的数量时,在平均计算时间内使用相同数量的块和池工作者的平均计算时间。

调查结果:

    左图显示了实验部分提到的所有场景所花费的计算时间。它表明块数/工人数=1所花费的计算时间总是大于块数>工人数所花费的计算时间. 也就是说,前一种情况总是比后者效率低。

    右图显示块数/工人数达到14或更多的阈值时,获得了1.2倍或更多的加速。有趣的是,当ProcessPoolExecutor.map() 与 1 个工作人员一起执行时,也出现了加速趋势。

结论:在自定义 ProcessPoolExecutor.map()` 用于解决给定任务的离散任务数量时,谨慎的做法是确保该数量大于池工作人员的数量,因为这种做法缩短了计算时间。

concurrent.futures.ProcessPoolExecutor.map() 代码。 (仅限修订部分)

def _concurrent_map(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop,
                               itertools.repeat(number))
        # 2.2. Consolidate result as a list and return this list.
        for future in futures:
            #print('type(future)=',type(future))
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('\n within statement of def _concurrent(nmax, number):')
        print("found 0 in 1:.4fsec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 4     # Pool of workers
    chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    a = _concurrent_map(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax=, workers=, num_of_chunks='.format(
          nmax, workers, num_of_chunks))
    print('workers = ', workers)
    print("found 0 in 1:.4fsec".format(len(a),end))

============================================== ============================

第 2 部分:使用 ProcessPoolExecutor 子类 .submit() 和 .map() 的总计算时间在返回排序/排序结果列表时可能不同。

背景:我已经修改了 .submit().map() 代码,以允许对它们的计算时间进行“苹果对苹果”的比较,并能够可视化计算时间主代码,主代码调用执行并发操作的 _concurrent 方法的计算时间,以及 _concurrent 方法调用的每个离散化任务/工作者的计算时间。此外,这些代码中的并发方法被构造为直接从.submit() 的未来对象和.map() 的迭代器返回结果的无序和有序列表。下面提供了源代码(希望对您有所帮助。)。

实验 这两个新改进的代码用于执行第 1 部分中描述的相同实验,不同之处在于只考虑了 6 个池工作人员和 python 内置的 listsorted 方法分别用于将结果的无序和有序列表返回到代码的主要部分。

调查结果:

    从_concurrent方法的结果,我们可以看到_concurrent方法用于创建ProcessPoolExecutor.submit()的所有Future对象,以及创建ProcessPoolExecutor.map()的迭代器的计算次数,作为离散任务数的函数超过池工人的数量,是相等的。这个结果仅仅意味着ProcessPoolExecutor 子类.submit().map() 具有同样的效率/速度。 比较 main 和它的 _concurrent 方法的计算时间,我们可以看到 main 运行的时间比它的 _concurrent 方法长。这是意料之中的,因为它们的时间差反映了listsorted 方法(以及包含在这些方法中的其他方法)的计算时间量。很明显,list 方法返回结果列表所需的计算时间少于sorted 方法。 .submit() 和 .map() 代码的 list 方法的平均计算时间相似,约为 0.47 秒。 .submit() 和 .map() 代码的排序方法的平均计算时间分别为 1.23 秒和 1.01 秒。换句话说,对于 .submit() 和 .map() 代码,list 方法的执行速度分别比 sorted 方法快 2.62 倍和 2.15 倍。 不清楚为什么sorted 方法从 .map().submit() 快,因为离散化的数量 任务增加的数量超过了池工作者的数量,保存时 离散化任务的数量等于池工作人员的数量。 也就是说,这些发现表明,使用同样快的.submit().map() 子类的决定可能会受到排序方法的阻碍。例如,如果打算在尽可能短的时间内生成有序列表,则应该优先使用 ProcessPoolExecutor.map() 而不是ProcessPoolExecutor.submit(),因为.map() 可以允许最短的总计算时间。 此处显示了我的答案第 1 部分中提到的离散化方案,以加快 .submit().map() 子类的性能。与离散化任务数量等于池工作人员数量的情况相比,加速量可高达 20%。

改进的 .map() 代码

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from itertools import repeat, chain 


def _findmatch(nmin, nmax, number):
    '''Function to find the occurence of number in range nmin to nmax and return
       the found occurences in a list.'''
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    #print("\n def _findmatch 0:<10 1:<10 2:<3 found 3:8 in 4:.4fsec".
    #      format(nmin, nmax, number, len(match),end))
    return match

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop, repeat(number))
    end = time() - start
    print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
    print("found in 0:.4fsec".format(end))
    return list(chain.from_iterable(futures)) #Return an unordered result list
    #return sorted(chain.from_iterable(futures)) #Return an ordered result list

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers
    chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    found = _concurrent(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax=, workers=, num_of_chunks='.format(
          nmax, workers, num_of_chunks))
    #print('found = ', found)
    print("found 0 in 1:.4fsec".format(len(found),end))    

改进的 .submit() 代码。 此代码与 .map 代码相同,只是您将 _concurrent 方法替换为以下内容:

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(num_of_chunks):
            cstart = chunksize * i
            cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
    end = time() - start
    print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
    print("found in 0:.4fsec".format(end))
    return list(chain.from_iterable(f.result() for f in cf.as_completed(
        futures))) #Return an unordered list
    #return list(chain.from_iterable(f.result() for f in cf.as_completed(
    #    futures))) #Return an ordered list

============================================= ============================

【讨论】:

以上是关于python concurrent.futures.ProcessPoolExecutor:.submit() 与 .map() 的性能的主要内容,如果未能解决你的问题,请参考以下文章

python concurrent.futures

python的multiprocessing和concurrent.futures有啥区别?

Python:Concurrent.Futures 错误 [TypeError:'NoneType' 对象不可调用]

python并发模块之concurrent.futures

python简单粗暴多线程之concurrent.futures

Python:inotify、concurrent.futures - 如何添加现有文件