concurrent.futures.Executor.map 中的异常处理
Posted
技术标签:
【中文标题】concurrent.futures.Executor.map 中的异常处理【英文标题】:Exception handling in concurrent.futures.Executor.map 【发布时间】:2018-12-06 20:33:56 【问题描述】:来自https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map
如果 func 调用引发异常,则该异常将被引发 当它的值从迭代器中检索出来时。
下面的 sn -p 只输出第一个 exeption (Exeption: 1),然后停止。这是否与上述说法相矛盾?我希望以下内容能打印出循环中的所有异常。
def test_func(val):
raise Exception(val)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for r in executor.map(test_func,[1,2,3,4,5]):
try:
print r
except Exception as exc:
print 'generated an exception: %s' % (exc)
【问题讨论】:
相关:***.com/questions/33448329/… 【参考方案1】:Ehsan 的解决方案很好,但在完成时获取结果可能比等待列表中的连续项目完成更有效。这是来自library docs 的示例。
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = executor.submit(load_url, url, 60): url for url in URLS
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
【讨论】:
谢谢。对于为什么我不能让脚本的所有部分都执行,我感到很困惑。 感谢您的精彩解释【参考方案2】:如上所述,不幸的是 executor.map 的 API 是有限的,只能让您获得第一个异常。此外,在遍历结果时,您只能获得第一个异常之前的值。
要回答您的问题,如果您不想使用其他库,您可以展开地图并手动应用每个功能:
future_list = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for arg in range(10):
future = executor.submit(test_func, arg)
future_list.append(future)
for future in future_list:
try:
print(future.result())
except Exception as e:
print(e)
这使您可以单独处理每个未来。
【讨论】:
【参考方案3】:map
方法返回一个生成器,它允许在准备好后对结果进行迭代。
很遗憾,发生异常后无法恢复生成器。来自PEP 255。
如果未处理的异常(包括但不限于 StopIteration)由生成器函数引发或通过,则异常会以通常的方式传递给调用者,随后会尝试恢复生成器函数引发 StopIteration。换句话说,未处理的异常会终止生成器的使用寿命。
还有其他库,例如pebble
,允许在发生错误后继续迭代。检查文档中的examples。
【讨论】:
谢谢。 pebble 是 python 中用于多线程和多处理的常用框架吗?现在python有自己的本机concurrent.futures模块不是多余的吗?pebble
克服了 Python 原生库的一些限制,例如上面的示例以及终止超时任务等其他问题。
Concurrent.futures 现在没有选项可以做这些事情吗?
不,它没有。这就是设计pebble
的原因。另一个值得检查的替代库是billiard
。
多么奇怪的行为......我想知道为什么会这样......【参考方案4】:
尽管其他人就捕获多个异常的正确方法给出了很好的答案,但我想回答为什么在问题中捕获异常的方法是错误的。以下sn-p:
class ExceptionA(Exception):
pass
def test_func(val):
raise ExceptionA(val)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
try:
for r in executor.map(test_func, [1, 2, 3, 4, 5]):
try:
print(r)
except ExceptionA as exc:
print(f'Catch inside: exc')
except ExceptionA as exc:
print(f'Catch outside: exc')
输出Catch outside: 1
。
python docs 内容如下:
如果 func 调用引发异常,则该异常将被引发 当它的值从迭代器中检索出来时。
这意味着如果你想捕捉异常,你需要在循环之外捕捉它,因为值是在循环语句而不是打印语句中检索的。
【讨论】:
以上是关于concurrent.futures.Executor.map 中的异常处理的主要内容,如果未能解决你的问题,请参考以下文章