Python:concurrent.futures 如何使其可取消?

Posted

技术标签:

【中文标题】Python:concurrent.futures 如何使其可取消?【英文标题】:Python: concurrent.futures How to make it cancelable? 【发布时间】:2017-08-04 14:37:48 【问题描述】:

Python concurrent.futures 和 ProcessPoolExecutor 提供了一个简洁的界面来安排和监控任务。期货甚至 provide 一个 .cancel() 方法:

cancel():尝试取消通话。如果调用当前正在执行且无法取消,则该方法将返回 False,否则该调用将被取消,该方法将返回 True。

不幸的是,在类似的question(关于 asyncio)中,答案声称使用此文档片段无法取消正在运行的任务,但只有当它们正在运行且不可取消时,文档才这么说。

向进程提交 multiprocessing.Events 也不是一件容易的事(通过参数这样做,如在 multiprocess.Process 中返回一个 RuntimeError)

我想做什么?我想对搜索空间进行分区并为每个分区运行一个任务。但是拥有一个解决方案就足够了,而且该过程是 CPU 密集型的。那么,有没有一种真正舒适的方法来实现这一点,并且不会通过使用 ProcessPool 来抵消收益?

例子:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    for d in done:
        print(d.result())

    print("---")
    for d in not_done:
        # will return false for Cancel and Result for all futures
        print("Cancel: "+str(d.cancel()))
        print("Result: "+str(d.result()))

【问题讨论】:

您可以尝试将Event 设置为全局变量,而不是将其作为参数传递,参见***.com/questions/1675766/… @niemmi 谢谢你的提示。我可能会尝试将此作为一种解决方法,因为它在调用不同模块时感觉设计得不好。 也许这一切都与没有立即取消 POSIX API 的事实有关:***.com/questions/2084830/… 【参考方案1】:

我不知道为什么concurrent.futures.Future 没有.kill() 方法,但是您可以通过使用pool.shutdown(wait=False) 关闭进程池并手动杀死剩余的子进程来完成您想要的操作。

创建一个杀死子进程的函数:

import signal, psutil

def kill_child_processes(parent_pid, sig=signal.SIGTERM):
    try:
        parent = psutil.Process(parent_pid)
    except psutil.NoSuchProcess:
        return
    children = parent.children(recursive=True)
    for process in children:
        process.send_signal(sig)

运行你的代码直到你得到第一个结果,然后杀死所有剩余的子进程:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
    # run 4 tasks with a partition, but only *one* solution is needed
    partition = range(i*steps,(i+1)*steps)
    futures.append(pool.submit(m_run, partition))

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)

# Shut down pool
pool.shutdown(wait=False)

# Kill remaining child processes
kill_child_processes(os.getpid())

【讨论】:

【参考方案2】:

很遗憾,运行Futures 无法取消。我认为核心原因是确保在不同的实现上使用相同的 API(不可能中断正在运行的线程或协程)。

Pebble 库旨在克服这一限制和其他限制。

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  

【讨论】:

我发现知道pebble 期货继承自concurrent.futures 期货很方便。因此,concurrent.futures 提供的许多方法也可以应用于pebble 期货,即使pebble 没有实现这些方法。这适用于例如对于concurrent.futuresas_completed 方法。因此,切换到 pebble 可能就像添加导入并更改 ProcessPoolExecuterpool.submit 的名称一样简单。 这可能很明显,但我只是想指出,如果您使用的是 ProcessPool,您将不再使用多个线程,而是使用多个进程。很多人不会关心区别,但至少知道你在做什么是值得的。【参考方案3】:

我发现你的问题很有趣,所以这是我的发现。

我发现.cancel() 方法的行为如 python 文档中所述。至于您正在运行的并发功能,不幸的是,即使他们被告知这样做,它们也无法取消。如果我的发现是正确的,那么我认为 Python 确实需要更有效的 .cancel() 方法。

运行下面的代码来检查我的发现。

from concurrent.futures import ProcessPoolExecutor, as_completed
from time import time 

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 3351355150:
            return elem
            break #Added to terminate loop once found
    return False

start = time()
futures = []
# used to create the partitions
steps = 1000000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    ### New Code: Start ### 
    for f in as_completed(futures):
        print(f.result())
        if f.result():
            print('break')
            break

    for f in futures:
        print(f, 'running?',f.running())
        if f.running():
            f.cancel()
            print('Cancelled? ',f.cancelled())

    print('New Instruction Ended at = ', time()-start )
print('Total Compute Time = ', time()-start )

更新: 可以通过 bash 强制终止并发进程,但结果是主 python 程序也将终止。如果这不是您的问题,请尝试以下代码。

您必须在最后 2 个打印语句之间添加以下代码才能自己查看。注意:此代码仅在您没有运行任何其他 python3 程序时才有效。

import subprocess, os, signal 
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],
                        stdout=subprocess.PIPE).stdout.decode('utf-8').split()
print ('result =', result)
for i in result:
    print('PID = ', i)
    if i != result[0]:
        os.kill(int(i), signal.SIGKILL)
        try: 
           os.kill(int(i), 0)
           raise Exception("""wasn't able to kill the process 
                              HINT:use signal.SIGKILL or signal.SIGABORT""")
        except OSError as ex:
           continue

【讨论】:

以上是关于Python:concurrent.futures 如何使其可取消?的主要内容,如果未能解决你的问题,请参考以下文章

python concurrent.futures

python的multiprocessing和concurrent.futures有啥区别?

Python:Concurrent.Futures 错误 [TypeError:'NoneType' 对象不可调用]

python并发模块之concurrent.futures

python简单粗暴多线程之concurrent.futures

Python:inotify、concurrent.futures - 如何添加现有文件