concurrent.futures.Executor.map 中的异常处理

Posted

技术标签:

【中文标题】concurrent.futures.Executor.map 中的异常处理【英文标题】:Exception handling in concurrent.futures.Executor.map 【发布时间】:2018-12-06 20:33:56 【问题描述】:

来自https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map

如果 func 调用引发异常,则该异常将被引发 当它的值从迭代器中检索出来时。

下面的 sn -p 只输出第一个 exeption (Exeption: 1),然后停止。这是否与上述说法相矛盾?我希望以下内容能打印出循环中的所有异常。

def test_func(val):
  raise Exception(val)        

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:   
  for r in executor.map(test_func,[1,2,3,4,5]):
    try:
      print r
    except Exception as exc:
      print 'generated an exception: %s' % (exc)

【问题讨论】:

相关:***.com/questions/33448329/… 【参考方案1】:

Ehsan 的解决方案很好,但在完成时获取结果可能比等待列表中的连续项目完成更有效。这是来自library docs 的示例。

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = executor.submit(load_url, url, 60): url for url in URLS
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

【讨论】:

谢谢。对于为什么我不能让脚本的所有部分都执行,我感到很困惑。 感谢您的精彩解释【参考方案2】:

如上所述,不幸的是 executor.map 的 API 是有限的,只能让您获得第一个异常。此外,在遍历结果时,您只能获得第一个异常之前的值。

要回答您的问题,如果您不想使用其他库,您可以展开地图并手动应用每个功能:

future_list = []
with concurrent.futures.ThreadPoolExecutor() as executor:
  for arg in range(10):
    future = executor.submit(test_func, arg)
    future_list.append(future)

for future in future_list:
  try:
    print(future.result())
  except Exception as e:
    print(e)

这使您可以单独处理每个未来。

【讨论】:

【参考方案3】:

map 方法返回一个生成器,它允许在准备好后对结果进行迭代。

很遗憾,发生异常后无法恢复生成器。来自PEP 255。

如果未处理的异常(包括但不限于 StopIteration)由生成器函数引发或通过,则异常会以通常的方式传递给调用者,随后会尝试恢复生成器函数引发 StopIteration。换句话说,未处理的异常会终止生成器的使用寿命。

还有其他库,例如pebble,允许在发生错误后继续迭代。检查文档中的examples。

【讨论】:

谢谢。 pebble 是 python 中用于多线程和多处理的常用框架吗?现在python有自己的本机concurrent.futures模块不是多余的吗? pebble 克服了 Python 原生库的一些限制,例如上面的示例以及终止超时任务等其他问题。 Concurrent.futures 现在没有选项可以做这些事情吗? 不,它没有。这就是设计pebble 的原因。另一个值得检查的替代库是billiard 多么奇怪的行为......我想知道为什么会这样......【参考方案4】:

尽管其他人就捕获多个异常的正确方法给出了很好的答案,但我想回答为什么在问题中捕获异常的方法是错误的。以下sn-p:

class ExceptionA(Exception):
    pass


def test_func(val):
    raise ExceptionA(val)


with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    try:
        for r in executor.map(test_func, [1, 2, 3, 4, 5]):
            try:
                print(r)
            except ExceptionA as exc:
                print(f'Catch inside: exc')

    except ExceptionA as exc:
        print(f'Catch outside: exc')

输出Catch outside: 1

python docs 内容如下:

如果 func 调用引发异常,则该异常将被引发 当它的值从迭代器中检索出来时。

这意味着如果你想捕捉异常,你需要在循环之外捕捉它,因为值是在循环语句而不是打印语句中检索的。

【讨论】:

以上是关于concurrent.futures.Executor.map 中的异常处理的主要内容,如果未能解决你的问题,请参考以下文章