ThreadPoolExecutor().map 与 ThreadPoolExecutor().submit 有何不同?

Posted

技术标签:

【中文标题】ThreadPoolExecutor().map 与 ThreadPoolExecutor().submit 有何不同?【英文标题】:How does ThreadPoolExecutor().map differ from ThreadPoolExecutor().submit? 【发布时间】:2014-01-17 06:10:34 【问题描述】:

我只是对我编写的一些代码感到非常困惑。我惊讶地发现:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(f, iterable))

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(map(lambda x: executor.submit(f, x), iterable))

产生不同的结果。第一个生成f 返回的任何类型的列表,第二个生成concurrent.futures.Future 对象的列表,然后需要使用它们的result() 方法评估这些对象以获得f 返回的值。

我主要担心的是这意味着executor.map 不能利用concurrent.futures.as_completed,这似乎是一种非常方便的方法来评估我正在对数据库进行的一些长期运行调用的结果它们变得可用。

我完全不清楚 concurrent.futures.ThreadPoolExecutor 对象是如何工作的——天真地,我更喜欢(稍微冗长一些):

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    results = [f.result() for f in futures.as_completed(result_futures)]

在更简洁的executor.map 上,以利用可能的性能提升。我这样做有错吗?

【问题讨论】:

【参考方案1】:

问题是您将ThreadPoolExecutor.map 的结果转换为列表。如果您不这样做,而是直接迭代生成的生成器,结果仍会按原始顺序生成,但循环会在所有结果准备好之前继续。你可以用这个例子来测试:

import time
import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
    print(i)

保留该顺序的原因可能是因为有时按照您为映射提供的相同顺序获得结果很重要。并且结果可能不会包含在未来的对象中,因为在某些情况下,如果您需要它们,可能需要太长时间才能在列表上执行另一个映射以获取所有结果。毕竟在大多数情况下,下一个值很可能在循环处理第一个值之前就准备好了。这在这个例子中得到了证明:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []

for value in results:
    finals.append(do_some_stuff(value))

在此示例中,do_some_stuff 可能比crunch_number 花费更长的时间,如果确实如此,那么在您仍然保持 map 的简单使用的同时,性能确实不会有很大损失。

此外,由于工作线程(/进程)从列表的开头开始处理并一直工作到您提交的列表的末尾,因此结果应该按照迭代器已经产生的顺序完成。这意味着在大多数情况下executor.map 很好,但在某些情况下,例如,如果您处理值的顺序无关紧要并且传递给map 的函数需要非常不同的时间来运行,@ 987654328@ 可能更快。

【讨论】:

是的,我不关心这里的退货单,我更感兴趣的是尽快完成任务。我担心的是,当结果的顺序无关紧要时,executor.map 的性能将比在通过将executor.submit 映射到适当的可迭代对象上生成的生成器上使用futures.as_completed 更差。你知道是不是这样吗? 我认为我的答案还不清楚。 “正确的顺序”在这种情况下意味着这些值很可能按照它们在您提交给map 的列表中的顺序完成。对于大多数你想用 map 做的计算,这成立。因为相同的功能通常(并非总是)花费大致相同的时间来运行。但是,如果您有一个运行时非常不同的函数,使用future.as_completed 可能会更快。 先设置s = [1,2,4,8]再设置s = [8,4,2,1]解决了我的一些疑惑。【参考方案2】:

下面是.submit().map() 的示例。他们都立即接受工作(提交|映射 - 开始)。它们需要相同的时间来完成,11 秒(最后结果时间 - 开始)。但是,.submit() 会在 ThreadPoolExecutor maxThreads=2 中的任何线程完成后立即给出结果(无序!)。而.map() 按照提交的顺序给出结果。

import time
import concurrent.futures

def worker(i):
    time.sleep(i)
    return i,time.time()

e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn

f = []
print 'start submit',time.time()
for i in arrIn:
    f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
    print r.result(),time.time()
print

f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
    print r,time.time()    

输出:

[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48

start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49

【讨论】:

这是一个很好的答案。为示例 +1。【参考方案3】:

除了此处答案中的解释之外,直接找到源代码可能会有所帮助。它重申了此处另一个答案的声明:

.map() 按提交顺序给出结果,而 用concurrent.futures.as_completed() 遍历Future 对象列表并不能保证这种排序,因为这是as_completed() 的本质

.map()在基类concurrent.futures._base.Executor中定义:

class Executor(object):
    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!

        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()  # <!!!!!!!!
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()  # <!!!!!!!!
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

正如你提到的,还有.submit(),它留待在子类中定义,即ProcessPoolExecutorThreadPoolExecutor,并返回一个_base.Future实例,你需要调用.result()实际上做任何事情。

.map() 中的重要语句归结为:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
    yield fs.pop().result()

.reverse() 加上.pop() 是让第一个提交的结果(来自iterables)首先产生,第二个提交的结果第二个产生的方法,依此类推。结果迭代器的元素不是Futures;它们本身就是实际结果。

【讨论】:

【参考方案4】:

如果你使用concurrent.futures.as_completed,你可以处理每个函数的异常。

import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]

def f(x):
    if x == 2:
        raise Exception('x')
    return x

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    for future in concurrent.futures.as_completed(result_futures):
        try:
            print('resutl is', future.result())
        except Exception as e:
            print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10

executor.map中,如果出现异常,整个executor都会停止。您需要在工作函数中处理异常。

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    for each in executor.map(f, iterable):
        print(each)
# if there is any exception, executor.map would stop

【讨论】:

以上是关于ThreadPoolExecutor().map 与 ThreadPoolExecutor().submit 有何不同?的主要内容,如果未能解决你的问题,请参考以下文章

Python concurrent库简单用法

将 InheritableThreadLocal 与 ThreadPoolExecutor - 或 - 不重用线程的 ThreadPoolExecutor 一起使用

ThreadPoolExecutor的参数介绍

ThreadPoolExecutor的一种扩展办法

JavaFX 多线程之 ThreadPoolExecutor 线程池

[转发]对ThreadPoolExecutor初识