ThreadPoolExecutor 中的工作人员并不是真正的守护进程

Posted

技术标签:

【中文标题】ThreadPoolExecutor 中的工作人员并不是真正的守护进程【英文标题】:The workers in ThreadPoolExecutor is not really daemon 【发布时间】:2018-10-04 03:54:46 【问题描述】:

我想不通的是,虽然ThreadPoolExecutor 使用了守护进程,但即使主线程退出,它们仍然会运行。

我可以在python3.6.4中提供一个最小的例子:

import concurrent.futures
import time


def fn():
    while True:
        time.sleep(5)
        print("Hello")


thread_pool = concurrent.futures.ThreadPoolExecutor()
thread_pool.submit(fn)
while True:
    time.sleep(1)
    print("Wow")

主线程和工作线程都是无限循环。所以如果我使用KeyboardInterrupt 来终止主线程,我希望整个程序也会终止。但实际上工作线程仍然在运行,即使它是一个守护线程。

ThreadPoolExecutor的源码确认工作线程是守护线程:

t = threading.Thread(target=_worker,
                     args=(weakref.ref(self, weakref_cb),
                           self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)

此外,如果我手动创建一个守护线程,它就像一个魅力:

from threading import Thread
import time


def fn():
    while True:
        time.sleep(5)
        print("Hello")


thread = Thread(target=fn)
thread.daemon = True
thread.start()
while True:
    time.sleep(1)
    print("Wow")

所以我真的无法弄清楚这种奇怪的行为。

【问题讨论】:

【参考方案1】:

突然……我找到了原因。根据ThreadPoolExecutor的更多源代码:

# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
#   - The workers would still be running during interpreter shutdown,
#     meaning that they would fail in unpredictable ways.
#   - The workers could be killed while evaluating a work item, which could
#     be bad if the callable being evaluated has external side-effects e.g.
#     writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.

_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False

def _python_exit():
    global _shutdown
    _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
        q.put(None)
    for t, q in items:
        t.join()

atexit.register(_python_exit)

有一个退出处理程序将加入所有未完成的工人......

【讨论】:

那么,daemon=True 没有达到主要目的有什么好处呢?【参考方案2】:

这是避免此问题的方法。糟糕的设计可以被另一个糟糕的设计打败。只有当他们真的知道工人不会损坏任何对象或文件时,人们才会写daemon=True

在我的例子中,我用一个工人创建了TreadPoolExecutor,在一个submit 之后,我刚刚从队列中删除了新创建的线程,所以解释器不会等到这个线程自行停止。请注意,工作线程是在submit 之后创建的,而不是在TreadPoolExecutor 初始化之后。

import concurrent.futures.thread
from concurrent.futures import ThreadPoolExecutor

...

executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(lambda: self._exec_file(args))
del concurrent.futures.thread._threads_queues[list(executor._threads)[0]]

它适用于 Python 3.8,但可能不适用于 3.9+,因为此代码正在访问私有变量。

查看工作代码on github

【讨论】:

以上是关于ThreadPoolExecutor 中的工作人员并不是真正的守护进程的主要内容,如果未能解决你的问题,请参考以下文章

Java:ThreadPoolExecutor中的Worker

Java:ThreadPoolExecutor中的Worker

Java:ThreadPoolExecutor中的Worker

删除 ThreadPoolExecutor 的所有排队任务

ThreadPoolExecutor 线程池

线程池ThreadPoolExecutor实现原理