python的多处理池的键盘中断
Posted
技术标签:
【中文标题】python的多处理池的键盘中断【英文标题】:Keyboard Interrupts with python's multiprocessing Pool 【发布时间】:2010-11-27 08:37:03 【问题描述】:如何使用 python 的多处理池处理 KeyboardInterrupt 事件?这是一个简单的例子:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results
if __name__ == "__main__":
go()
运行上面的代码时,当我按下^C
时,KeyboardInterrupt
会被提升,但该进程会在此时挂起,我必须在外部将其杀死。
我希望能够随时按^C
并让所有进程正常退出。
【问题讨论】:
我使用 psutil 解决了我的问题,您可以在此处查看解决方案:***.com/questions/32160054/… 【参考方案1】:奇怪的是,您似乎还必须处理孩子中的KeyboardInterrupt
。我本来希望这能像书面一样工作......尝试将slowly_square
更改为:
def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0
这应该可以按您的预期工作。
【讨论】:
我试过了,它实际上并没有终止整个作业集。它会终止当前正在运行的作业,但脚本仍会在 pool.map 调用中分配剩余的作业,就好像一切正常一样。 这没关系,但您可能会忘记发生的错误。使用堆栈跟踪返回错误可能会起作用,因此父进程可以知道发生了错误,但当错误发生时它仍然不会立即退出。【参考方案2】:这是一个 Python 错误。在 threading.Condition.wait() 中等待条件时,永远不会发送 KeyboardInterrupt。再现:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
KeyboardInterrupt 异常在 wait() 返回之前不会被传递,并且它永远不会返回,因此中断永远不会发生。 KeyboardInterrupt 几乎肯定会中断条件等待。
请注意,如果指定了超时,则不会发生这种情况; cond.wait(1) 将立即收到中断。因此,一种解决方法是指定超时。为此,请替换
results = pool.map(slowly_square, range(40))
与
results = pool.map_async(slowly_square, range(40)).get(9999999)
或类似的。
【讨论】:
官方 python 跟踪器中的这个错误是否存在?我很难找到它,但我可能只是没有使用最好的搜索词。 此错误已归档为 [问题 8296][1]。 [1]:bugs.python.org/issue8296 这并不能完全解决问题。有时,当我按下 Control+C 时,我会得到预期的行为,有时则不会。我不知道为什么,但看起来 KeyboardInterrupt 可能是由其中一个进程随机接收的,如果父进程是捕获它的那个,我只会得到正确的行为。 这不适用于 Windows 上的 Python 3.6.1。当我执行 Ctrl-C 时,我会得到大量的堆栈跟踪和其他垃圾,即与没有这种解决方法的情况相同。事实上,我从这个线程中尝试过的所有解决方案似乎都不起作用...... jehejj,2019年还没有定下来。像并行做IO是个新奇的想法:/【参考方案3】:由于某些原因,只有从基类 Exception
继承的异常才能正常处理。作为一种解决方法,您可以将您的 KeyboardInterrupt
重新提升为 Exception
实例:
from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()
def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'
if __name__ == '__main__':
main()
通常你会得到以下输出:
staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
所以如果你点击^C
,你会得到:
staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
【讨论】:
看来这不是一个完整的解决方案。如果在multiprocessing
执行自己的IPC 数据交换时到达KeyboardInterrupt
,那么try..catch
将不会被激活(显然)。
您可以将raise KeyboardInterruptError
替换为return
。您只需要确保在收到 KeyboardInterrupt 后子进程就结束。返回值似乎被忽略了,在main
中仍然收到了KeyboardInterrupt。【参考方案4】:
我发现,目前最好的解决方案是不使用 multiprocessing.pool 功能,而是使用您自己的池功能。我提供了一个演示 apply_async 错误的示例,以及一个演示如何完全避免使用池功能的示例。
http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/
【讨论】:
像魅力一样工作。这是一个干净的解决方案,而不是某种 hack(/me 认为)。顺便说一句,其他人提出的 .get(99999) 技巧会严重损害性能。 我没有注意到使用超时有任何性能损失,尽管我一直使用 9999 而不是 999999。例外情况是引发不从 Exception 类继承的异常:然后你必须等到超时。解决方案是捕获所有异常(请参阅我的解决方案)。【参考方案5】:根据我最近的发现,最好的解决方案是设置工作进程以完全忽略 SIGINT,并将所有清理代码限制在父进程中。这解决了空闲和忙碌工作进程的问题,并且不需要您的子进程中的错误处理代码。
import signal
...
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
pool = multiprocessing.Pool(size, init_worker)
...
except KeyboardInterrupt:
pool.terminate()
pool.join()
解释和完整的示例代码可以分别在http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/和http://github.com/jreese/multiprocessing-keyboardinterrupt找到。
【讨论】:
嗨,约翰。您的解决方案与我的解决方案不同,不幸的是,解决方案很复杂。它隐藏在主进程中的time.sleep(10)
后面。如果您要删除该睡眠,或者如果您等到进程尝试加入池,您必须这样做以保证作业完成,那么您仍然会遇到同样的问题,即主进程没有'在等待轮询 join
操作时不会收到 KeyboardInterrupt。
在我在生产中使用此代码的情况下,time.sleep() 是循环的一部分,该循环将检查每个子进程的状态,然后延迟重新启动某些进程,如果必要的。与等待所有进程完成的 join() 不同,它会单独检查它们,确保主进程保持响应。
所以它更像是一个繁忙的等待(可能在检查之间有小的睡眠),通过另一种方法而不是加入来轮询进程完成?如果是这种情况,或许最好将此代码包含在您的博客文章中,因为这样您就可以保证所有工作人员在尝试加入之前都已完成。
这不起作用。只有孩子会收到信号。父级永远不会收到它,所以pool.terminate()
永远不会被执行。让孩子们忽略这个信号是没有任何作用的。 @Glenn 的回答解决了这个问题。
我的版本是gist.github.com/admackin/003dd646e5fadee8b8d6;它不会调用 .join()
,除非在中断时 - 它只是使用 AsyncResult.ready()
手动检查 .apply_async()
的结果以查看它是否准备好,这意味着我们已经干净地完成了。【参考方案6】:
通常这种简单的结构适用于 Pool 上的 Ctrl-C :
def signal_handle(_signal, frame):
print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)
正如一些类似的帖子所述:
Capture keyboardinterrupt in Python without try-except
【讨论】:
这也必须在每个工作进程上完成,如果在多处理库初始化时引发 KeyboardInterrupt,仍然可能会失败。【参考方案7】:似乎有两个问题会导致多处理烦人时出现异常。第一个(Glenn 指出)是您需要使用 map_async
超时而不是 map
以获得立即响应(即,不要完成整个列表的处理)。第二个(由 Andrey 指出)是多处理不会捕获不从 Exception
继承的异常(例如,SystemExit
)。所以这是我处理这两个问题的解决方案:
import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
"""Run function under the pool
Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
"""Run the pool
Wrapper around pool.map_async, to handle timeout. This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://***.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results
【讨论】:
我没有注意到任何性能损失,但就我而言,function
的寿命相当长(数百秒)。
实际上已经不是这样了,至少从我的眼睛和经验来看。如果您在各个子进程中捕获键盘异常并在主进程中再次捕获它,那么您可以继续使用map
,一切都很好。 @Linux Cli Aik
在下面提供了一个产生这种行为的解决方案。如果主线程依赖于子进程的结果,则并不总是需要使用map_async
。【参考方案8】:
我是 Python 的新手。我到处寻找答案,偶然发现了这个以及其他一些博客和 youtube 视频。我试图复制粘贴上面作者的代码并在 Windows 7 64 位的 python 2.7.13 上重现它。它接近我想要实现的目标。
我让我的子进程忽略 ControlC 并使父进程终止。看起来绕过子进程确实为我避免了这个问题。
#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results
if __name__ == '__main__':
go()
从pool.terminate()
开始的部分似乎永远不会执行。
【讨论】:
我也刚刚想通了!老实说,我认为这是解决此类问题的最佳方法。公认的解决方案将map_async
强制到用户身上,我不是特别喜欢。在许多情况下,例如我的情况,主线程需要等待各个进程完成。这也是map
存在的原因之一!【参考方案9】:
投票的答案没有解决核心问题,而是类似的副作用。
多处理库的作者 Jesse Noller 解释了在旧的 blog post 中使用 multiprocessing.Pool
时如何正确处理 CTRL+C。
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
【讨论】:
我发现 ProcessPoolExecutor 也有同样的问题。我能找到的唯一解决方法是从未来调用os.setpgrp()
当然,唯一的区别是ProcessPoolExecutor
不支持初始化函数。在 Unix 上,您可以通过在创建池之前禁用主进程上的 sighandler 并在之后重新启用它来利用 fork
策略。在pebble 中,我默认对子进程保持沉默SIGINT
。我不知道他们对 Python 池不这样做的原因。最后,用户可以重新设置SIGINT
处理程序,以防他/她想伤害自己。
这个解决方案似乎也可以防止 Ctrl-C 中断主进程。
我刚刚在 Python 3.5 上进行了测试,它工作正常,你使用的是什么版本的 Python?什么操作系统?【参考方案10】:
您可以尝试使用 Pool 对象的 apply_async 方法,如下所示:
import multiprocessing
import time
from datetime import datetime
def test_func(x):
time.sleep(2)
return x**2
def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
try:
jobs =
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])
results =
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()
if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: ".format(t1 - t0)
t2 = datetime.now()
results2 = i: test_func(i) for i in iterations
t3 = datetime.now()
print results2
print "Non-multi: ".format(t3 - t2)
输出:
100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000
这种方法的一个优点是中断前处理的结果会在结果字典中返回:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25
【讨论】:
光荣完整的例子 很好的例子。 谢谢。我试图弄清楚这如何推广到多个论点。特别是,为什么jobs[value] = pool.apply_async(input_function, [value])
中传递[value]
而不是value
?
是否有可能让中断的进程返回一个中间结果?【参考方案11】:
如果您正在执行诸如 Pool.map
之类的方法,这些答案中的许多都是旧的和/或它们似乎不适用于 Windows 上的 更高版本的 Python(我正在运行 3.8.5),这会阻止直到所有提交的任务都完成。以下是我的解决方案。
-
在主进程中调用
signal.signal(signal.SIGINT, signal.SIG_IGN)
以完全忽略 Ctrl-C。
处理池将使用池初始化程序进行初始化,从而初始化每个处理器:全局变量 ctrl_c_entered
将设置为 False
,并且对 signal.signal(signal.SIGINT, signal.SIG_IGN)
的调用将首先发出 忽略 Ctrl-C。此调用的返回值将被保存;这是原始的默认处理程序,重新建立后允许处理 KyboardInterrupt
异常。
装饰器handle_ctrl_c
可用于装饰应在输入Ctrl-C 时立即退出的多处理函数和方法。这个装饰器将测试是否设置了全局 ctrl_c_entered
标志,如果设置了,甚至不需要运行函数/方法,而是返回一个 KeyboardInterrupt
异常实例。否则,将建立 KeyboardInterrupt
的 try/catch 处理程序,并调用修饰的函数/方法。如果输入 Ctrl-C,则全局 ctrl_c_entered
将设置为 True
并返回一个 KeyboardInterrupt
异常实例。无论如何,在返回之前,装饰器将重新建立 SIG_IGN 处理程序。
本质上,所有提交的任务都将被允许启动,但一旦输入 Ctrl-C,将立即终止并返回 KeyBoardInterrupt
异常。主进程可以测试返回值是否存在这样的返回值来检测是否输入了Ctrl-C。
from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps
def handle_ctrl_c(func):
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
signal.signal(signal.SIGINT, default_sigint_handler) # the default
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper
@handle_ctrl_c
def slowly_square(i):
sleep(1)
return i*i
def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
def main():
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=init_pool)
results = pool.map(slowly_square, range(40))
if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
print('Ctrl-C was entered.')
else:
print(results)
if __name__ == '__main__':
main()
【讨论】:
确认这在 Windows 上的 Python 3.7.7 上按预期工作。感谢发帖!以上是关于python的多处理池的键盘中断的主要内容,如果未能解决你的问题,请参考以下文章