Python ThreadPoolExecutor - 回调是不是保证与提交的函数在同一个线程中运行?
Posted
技术标签:
【中文标题】Python ThreadPoolExecutor - 回调是不是保证与提交的函数在同一个线程中运行?【英文标题】:Python ThreadPoolExecutor - is the callback guaranteed to run in the same thread as submitted func?Python ThreadPoolExecutor - 回调是否保证与提交的函数在同一个线程中运行? 【发布时间】:2014-11-19 05:16:30 【问题描述】:在 ThreadPoolExecutor (TPE) 中,回调是否总是保证与提交的函数在同一个线程中运行?
例如,我使用以下代码对此进行了测试。我运行了很多次,似乎func
和callback
总是在同一个线程中运行。
import concurrent.futures
import random
import threading
import time
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def func(x):
time.sleep(random.random())
return threading.current_thread().name
def callback(future):
time.sleep(random.random())
x = future.result()
cur_thread = threading.current_thread().name
if (cur_thread != x):
print(cur_thread, x)
print('main thread: %s' % threading.current_thread())
for i in range(10000):
future = executor.submit(func, i)
future.add_done_callback(callback)
但是,当我删除 time.sleep(random.random())
语句时,它似乎失败了,即至少有几个 func
函数和 callbacks
没有在同一个线程中运行。
对于我正在处理的项目,回调必须始终与提交的函数在同一线程上运行,因此我想确保 TPE 能保证这一点。 (而且没有随机睡眠的测试结果也令人费解)。
我查看了source code for executors,似乎我们在运行回调之前没有将线程切换到主线程。但只是想确定一下。
【问题讨论】:
【参考方案1】:文档不保证在哪个线程中运行回调。The only documented guarantee 是回调将在属于添加回调的进程的线程中运行,但可以是任何线程,因为您使用的是 ThreadPoolExecutor ProcessPoolExecutor 的:
添加的可调用对象按照它们添加的顺序被调用,并且总是在属于添加它们的进程的线程中调用。
在当前的 ThreadPoolExecutor 实现中,回调执行的线程取决于添加回调时 Future
的状态,以及 Future
是否被取消。这些是实现细节;你不应该依赖它们,因为它们在不同的 Python 实现或不同的版本中可能会有所不同,并且它们可能会更改,恕不另行通知。
如果您在Future
完成后添加回调,则回调将在您调用add_done_callback
的任何线程中执行。您可以通过查看add_done_callback
源代码来了解这一点:
def add_done_callback(self, fn):
"""Attaches a callable that will be called when the future finishes.
Args:
fn: A callable that will be called with this future as its only
argument when the future completes or is cancelled. The callable
will always be called by a thread in the same process in which
it was added. If the future has already completed or been
cancelled then the callable will be called immediately. These
callables are called in the order that they were added.
"""
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
如果Future
的状态表明它被取消或完成,fn
会立即在当前执行线程中调用。否则,它会添加到内部回调列表中,以便在 Future
完成时运行。
例如:
>>> def func(*args):
... time.sleep(5)
... print("func ".format(threading.current_thread()))
>>> def cb(a): print("cb ".format(threading.current_thread()))
...
>>> fut = ex.submit(func)
>>> func <Thread(Thread-1, started daemon 140084551563008)>
>>> fut = e.add_done_callback(cb)
cb <_MainThread(MainThread, started 140084622018368)>
如果一个future被一个成功的cancel
调用取消了,那么执行取消的线程会立即调用所有的回调:
def cancel(self):
"""Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
self._condition.notify_all()
self._invoke_callbacks()
return True
否则,执行未来任务的线程会调用回调。
【讨论】:
那么ProcessPoolExecutor
呢?回调的get_ident
也不同以上是关于Python ThreadPoolExecutor - 回调是不是保证与提交的函数在同一个线程中运行?的主要内容,如果未能解决你的问题,请参考以下文章
[python] ThreadPoolExecutor线程池 python 线程池
如何在 python 3 中将队列与并发未来的 ThreadPoolExecutor 一起使用?
python爬虫 threading 多线程 ThreadPoolExecutor线程池
python线程池ThreadPoolExecutor.submit的数据丢失问题