Python:concurrent.futures 如何使其可取消?
Posted
技术标签:
【中文标题】Python:concurrent.futures 如何使其可取消?【英文标题】:Python: concurrent.futures How to make it cancelable? 【发布时间】:2017-08-04 14:37:48 【问题描述】:Python concurrent.futures 和 ProcessPoolExecutor 提供了一个简洁的界面来安排和监控任务。期货甚至 provide 一个 .cancel() 方法:
cancel():尝试取消通话。如果调用当前正在执行且无法取消,则该方法将返回 False,否则该调用将被取消,该方法将返回 True。
不幸的是,在类似的question(关于 asyncio)中,答案声称使用此文档片段无法取消正在运行的任务,但只有当它们正在运行且不可取消时,文档才这么说。
向进程提交 multiprocessing.Events 也不是一件容易的事(通过参数这样做,如在 multiprocess.Process 中返回一个 RuntimeError)
我想做什么?我想对搜索空间进行分区并为每个分区运行一个任务。但是拥有一个解决方案就足够了,而且该过程是 CPU 密集型的。那么,有没有一种真正舒适的方法来实现这一点,并且不会通过使用 ProcessPool 来抵消收益?
例子:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False
futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
for d in done:
print(d.result())
print("---")
for d in not_done:
# will return false for Cancel and Result for all futures
print("Cancel: "+str(d.cancel()))
print("Result: "+str(d.result()))
【问题讨论】:
您可以尝试将Event
设置为全局变量,而不是将其作为参数传递,参见***.com/questions/1675766/…
@niemmi 谢谢你的提示。我可能会尝试将此作为一种解决方法,因为它在调用不同模块时感觉设计得不好。
也许这一切都与没有立即取消 POSIX API 的事实有关:***.com/questions/2084830/…
【参考方案1】:
我不知道为什么concurrent.futures.Future
没有.kill()
方法,但是您可以通过使用pool.shutdown(wait=False)
关闭进程池并手动杀死剩余的子进程来完成您想要的操作。
创建一个杀死子进程的函数:
import signal, psutil
def kill_child_processes(parent_pid, sig=signal.SIGTERM):
try:
parent = psutil.Process(parent_pid)
except psutil.NoSuchProcess:
return
children = parent.children(recursive=True)
for process in children:
process.send_signal(sig)
运行你的代码直到你得到第一个结果,然后杀死所有剩余的子进程:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 135135515:
return elem
return False
futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))
done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)
# Shut down pool
pool.shutdown(wait=False)
# Kill remaining child processes
kill_child_processes(os.getpid())
【讨论】:
【参考方案2】:很遗憾,运行Futures
无法取消。我认为核心原因是确保在不同的实现上使用相同的 API(不可能中断正在运行的线程或协程)。
Pebble 库旨在克服这一限制和其他限制。
from pebble import ProcessPool
def function(foo, bar=0):
return foo + bar
with ProcessPool() as pool:
future = pool.schedule(function, args=[1])
# if running, the container process will be terminated
# a new process will be started consuming the next task
future.cancel()
【讨论】:
我发现知道pebble
期货继承自concurrent.futures
期货很方便。因此,concurrent.futures
提供的许多方法也可以应用于pebble
期货,即使pebble
没有实现这些方法。这适用于例如对于concurrent.futures
的as_completed
方法。因此,切换到 pebble 可能就像添加导入并更改 ProcessPoolExecuter
和 pool.submit
的名称一样简单。
这可能很明显,但我只是想指出,如果您使用的是 ProcessPool,您将不再使用多个线程,而是使用多个进程。很多人不会关心区别,但至少知道你在做什么是值得的。【参考方案3】:
我发现你的问题很有趣,所以这是我的发现。
我发现.cancel()
方法的行为如 python 文档中所述。至于您正在运行的并发功能,不幸的是,即使他们被告知这样做,它们也无法取消。如果我的发现是正确的,那么我认为 Python 确实需要更有效的 .cancel() 方法。
运行下面的代码来检查我的发现。
from concurrent.futures import ProcessPoolExecutor, as_completed
from time import time
# function that profits from partitioned search space
def m_run(partition):
for elem in partition:
if elem == 3351355150:
return elem
break #Added to terminate loop once found
return False
start = time()
futures = []
# used to create the partitions
steps = 1000000000
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(4):
# run 4 tasks with a partition, but only *one* solution is needed
partition = range(i*steps,(i+1)*steps)
futures.append(pool.submit(m_run, partition))
### New Code: Start ###
for f in as_completed(futures):
print(f.result())
if f.result():
print('break')
break
for f in futures:
print(f, 'running?',f.running())
if f.running():
f.cancel()
print('Cancelled? ',f.cancelled())
print('New Instruction Ended at = ', time()-start )
print('Total Compute Time = ', time()-start )
更新: 可以通过 bash 强制终止并发进程,但结果是主 python 程序也将终止。如果这不是您的问题,请尝试以下代码。
您必须在最后 2 个打印语句之间添加以下代码才能自己查看。注意:此代码仅在您没有运行任何其他 python3 程序时才有效。
import subprocess, os, signal
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],
stdout=subprocess.PIPE).stdout.decode('utf-8').split()
print ('result =', result)
for i in result:
print('PID = ', i)
if i != result[0]:
os.kill(int(i), signal.SIGKILL)
try:
os.kill(int(i), 0)
raise Exception("""wasn't able to kill the process
HINT:use signal.SIGKILL or signal.SIGABORT""")
except OSError as ex:
continue
【讨论】:
以上是关于Python:concurrent.futures 如何使其可取消?的主要内容,如果未能解决你的问题,请参考以下文章
python的multiprocessing和concurrent.futures有啥区别?
Python:Concurrent.Futures 错误 [TypeError:'NoneType' 对象不可调用]