一旦 Futures 开始,你如何杀死它们?

Posted

技术标签:

【中文标题】一旦 Futures 开始,你如何杀死它们?【英文标题】:How do you kill Futures once they have started? 【发布时间】:2015-05-24 12:13:27 【问题描述】:

我正在使用新的concurrent.futures 模块(它也有一个 Python 2 反向端口)来执行一些简单的多线程 I/O。我无法理解如何彻底终止使用此模块开始的任务。

查看以下 Python 2/3 脚本,它重现了我看到的行为:

#!/usr/bin/env python
from __future__ import print_function

import concurrent.futures
import time


def control_c_this():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future1 = executor.submit(wait_a_bit, name="Jack")
        future2 = executor.submit(wait_a_bit, name="Jill")

        for future in concurrent.futures.as_completed([future1, future2]):
            future.result()

        print("All done!")


def wait_a_bit(name):
    print("n is waiting...".format(n=name))
    time.sleep(100)


if __name__ == "__main__":
    control_c_this()

当此脚本运行时,使用常规的 Control-C 键盘中断似乎无法彻底终止。我在 OS X 上运行。

在 Python 2.7 上,我必须从命令行使用 kill 来终止脚本。 Control-C 会被忽略。 在 Python 3.4 上,如果按两次 Control-C,则可以正常工作,但随后会转储许多奇怪的堆栈跟踪。

我在网上找到的大多数文档都在讨论如何使用旧的 threading 模块彻底杀死线程。似乎都不适用于这里。

并且concurrent.futures 模块中提供的所有停止内容的方法(如Executor.shutdown()Future.cancel())仅在Futures 尚未开始或完成时才有效,在这种情况下这是没有意义的。我想立即打断Future。

我的用例很简单:当用户按下 Control-C 时,脚本应该像任何表现良好的脚本一样立即退出。这就是我想要的。

那么在使用concurrent.futures 时获得这种行为的正确方法是什么?

【问题讨论】:

阅读related question about Java,我发现杀死线程不是你通常会做的事情,因为它会使你的程序状态不一致。就我而言,我不认为这是一个问题,因为我只想退出整个程序。还提到了setting some shared variable,线程可以读取以了解何时自行终止。不确定这种方法是否适用于 Python。 请注意,Ctrl+Break 会起作用,即使 Ctrl+C 不起作用。 @jedwards - 使用 Python 2 我正在尝试 Command + 。 (这显然是 OS X 上的 Control + Break),它似乎不起作用。实际上,似乎等同于 Control + C。 【参考方案1】:

聚会迟到了,但我也遇到了同样的问题。

我想立即终止我的程序,我不在乎发生了什么。除了 Linux 的功能之外,我不需要彻底关机。

我发现将 KeyboardInterrupt 异常处理程序中的 geitda 代码替换为 os.kill(os.getpid(), 9) 在第一个 ^C 之后立即退出。

【讨论】:

请不要添加“谢谢”作为答案。一旦你有足够的reputation,你将能够vote up questions and answers,你觉得有帮助。 - From Review【参考方案2】:

我遇到了这个问题,但我遇到的问题是许多期货(成千上万的)将等待运行,只需按 Ctrl-C 就会让它们等待,而不是真正退出。我使用concurrent.futures.wait 运行进度循环,需要添加try ... except KeyboardInterrupt 来处理取消未完成的期货。

POLL_INTERVAL = 5
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
    futures = [pool.submit(do_work, arg) for arg in large_set_to_do_work_over]
    # next line returns instantly
    done, not_done = concurrent.futures.wait(futures, timeout=0)
    try:
        while not_done:
            # next line 'sleeps' this main thread, letting the thread pool run
            freshly_done, not_done = concurrent.futures.wait(not_done, timeout=POLL_INTERVAL)
            done |= freshly_done
            # more polling stats calculated here and printed every POLL_INTERVAL seconds...
    except KeyboardInterrupt:
        # only futures that are not done will prevent exiting
        for future in not_done:
            # cancel() returns False if it's already done or currently running,
            # and True if was able to cancel it; we don't need that return value
            _ = future.cancel()
         # wait for running futures that the above for loop couldn't cancel (note timeout)
         _ = concurrent.futures.wait(not_done, timeout=None)

如果您有兴趣准确跟踪已完成和未完成的内容(即不想要进度循环),您可以替换第一个等待调用(与timeout=0) 与not_done = futures 并仍然保留while not_done: 逻辑。

for future in not_done: 取消循环可能会根据该返回值表现出不同的行为(或写为理解),但等待完成或取消的期货并不是真正的等待——它会立即返回。最后一个 waittimeout=None 确保池的运行作业确实完成。

同样,只有在实际调用的 do_work 最终在合理的时间内返回时,这才能正常工作。这对我来说很好 - 事实上,我想确保如果 do_work 开始,它会运行到完成。如果do_work 是“无尽的”,那么您将需要类似 cdosborn 的答案,它使用对所有线程可见的变量,指示它们自行停止。

【讨论】:

【参考方案3】:

有点痛苦。本质上,您的工作线程必须在您的主线程退出之前完成。除非他们退出,否则您无法退出。典型的解决方法是有一些全局状态,每个线程都可以检查以确定它们是否应该做更多的工作。

这是quote 解释原因。本质上,如果线程在解释器退出时退出,可能会发生不好的事情。

这是一个工作示例。请注意,由于子线程的睡眠持续时间,C-c 最多需要 1 秒才能传播。

#!/usr/bin/env python
from __future__ import print_function

import concurrent.futures
import time
import sys

quit = False
def wait_a_bit(name):
    while not quit:
        print("n is doing work...".format(n=name))
        time.sleep(1)

def setup():
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    future1 = executor.submit(wait_a_bit, "Jack")
    future2 = executor.submit(wait_a_bit, "Jill")

    # main thread must be doing "work" to be able to catch a Ctrl+C 
    # http://www.luke.maurits.id.au/blog/post/threads-and-signals-in-python.html
    while (not (future1.done() and future2.done())):
        time.sleep(1)

if __name__ == "__main__":
    try:
        setup()
    except KeyboardInterrupt:
        quit = True

【讨论】:

你不必睡觉。你只需要他们检查他们是否应该退出。 当我使用ThreadPoolExecutor 时,这个技巧对我有用,但不适用于ProcessPoolExecutor。尝试跨进程共享全局变量时是否有一些问题?我必须将quit 标志存储在磁盘上吗? 进程不共享变量,您必须使用队列或信号量进行通信。 @NickChammas AFAIK 睡眠通常不会消耗任何相关量的 CPU 时间。尝试创建 10k 线程,立即进入睡眠状态一天;在设置时间之后,您将看不到任何 CPU 使用率。所以在大多数应用程序中这应该没问题。 这个答案似乎缺少两个方面。 1.为什么反复按CTRL-C确实会更快地关闭所有内容。 2.在考虑信号时,您引用的引用有些不切实际的期望:“工作人员可能会在评估工作项目时被杀死,如果正在评估的可调用对象具有外部副作用,例如写入文件,这可能会很糟糕。”如果程序是单线程的,CTRL-C 通常会对整个执行产生这些影响。简单地将 SIGINT 从主线程传播到所有守护进程,然后加入()它们呢?

以上是关于一旦 Futures 开始,你如何杀死它们?的主要内容,如果未能解决你的问题,请参考以下文章

python 期物

concurrent.futures.as_completed 是如何工作的?

教你如何找到正在运行中的进程 ID 并杀死它

Python - 正确杀死/退出期货线程?

concurrent.futures 和 asyncio.futures 有啥区别?

如何杀死一个无法杀死的任务?