Python 中如何快速实现一个线程池?

Posted Python猫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 中如何快速实现一个线程池?相关的知识,希望对你有一定的参考价值。

来源:古明地觉的编程教室

当有多个 IO 密集型的任务要被处理时,我们自然而然会想到多线程。但如果任务非常多,我们不可能每一个任务都启动一个线程去处理,这个时候最好的办法就是实现一个线程池,至于池子里面的线程数量可以根据业务场景进行设置。

比如我们实现一个有 10 个线程的线程池,这样可以并发地处理 10 个任务,每个线程将任务执行完之后,便去执行下一个任务。通过使用线程池,可以避免因线程创建过多而导致资源耗尽,而且任务在执行时的生命周期也可以很好地把控。

而线程池的实现方式也很简单,但这里我们不打算手动实现,因为 Python 提供了一个标准库 concurrent.futures,已经内置了对线程池的支持。所以本篇文章,我们就来详细介绍一下该模块的用法。

Future 对象

当我们往线程池里面提交一个函数时,会分配一个线程去执行,同时立即返回一个 Future 对象。通过 Future 对象可以监控函数的执行状态,有没有出现异常,以及有没有执行完毕等等。如果函数执行完毕,内部便会调用 future.set_result 将返回值设置到 future 里面,然后外界便可调用 future.result 拿到返回值。

除此之外 future 还可以绑定回调,一旦函数执行完毕,就会以 future 为参数,自动触发回调。所以 future 被称为未来对象,可以把它理解为函数的一个容器,当我们往线程池提交一个函数时,会立即创建相应的 future 然后返回。函数的执行状态什么的,都通过 future 来查看,当然也可以给它绑定一个回调,在函数执行完毕时自动触发。

那么下面我们就来看一下 future 的用法,文字的话理解起来可能有点枯燥。

"""
将函数提交到线程池里面运行时,会立即返回一个对象
这个对象就叫做 Future 对象,里面包含了函数的执行状态等等
当然我们也可以手动创建一个Future对象。
"""
from concurrent.futures import Future

# 创建 Future 对象 future
future = Future()

# 给 future 绑定回调
def callback(f: Future):
    print("当set_result的时候会执行回调,result:",
          f.result())

future.add_done_callback(callback)
# 通过 add_done_callback 方法即可给 future 绑定回调
# 调用的时候会自动将 future 作为参数
# 如果需要多个参数,那么就使用偏函数

# 回调函数什么时候执行呢?
# 显然是当 future 执行 set_result 的时候
# 如果 future 是向线程池提交函数时返回的
# 那么当函数执行完毕时会自动执行 future.set_result(xx)
# 并将自身的返回设置进去
# 而这里的 future 是我们手动创建的,因此需要手动执行
future.set_result("嘿嘿")
"""
当set_result的时候会执行回调,result: 嘿嘿
"""

需要注意的是:只能执行一次 set_result,但是可以多次调用 result 获取结果。

from concurrent.futures import Future

future = Future()
future.set_result("哼哼")

print(future.result())  # 哼哼
print(future.result())  # 哼哼
print(future.result())  # 哼哼

执行 future.result() 之前一定要先 set_result,否则会一直处于阻塞状态。当然 result 方法还可以接收一个 timeout 参数,表示超时时间,如果在指定时间内没有获取到值就会抛出异常。

提交函数自动创建 Future 对象

我们上面是手动创建的 Future 对象,但工作中很少会手动创建。我们将函数提交到线程池里面运行的时候,会自动创建 Future 对象并返回。这个 Future 对象里面就包含了函数的执行状态,比如此时是处于暂停、运行中还是完成等等,并且函数在执行完毕之后,还会调用 future.set_result 将自身的返回值设置进去。

from concurrent.futures import ThreadPoolExecutor
import time

def task(name, n):
    time.sleep(n)
    return f"name 睡了 n 秒"

# 创建一个线程池
# 里面还可以指定 max_workers 参数,表示最多创建多少个线程
# 如果不指定,那么每提交一个函数,都会为其创建一个线程
executor = ThreadPoolExecutor()

# 通过 submit 即可将函数提交到线程池,一旦提交,就会立刻运行
# 因为开启了一个新的线程,主线程会继续往下执行
# 至于 submit 的参数,按照函数名,对应参数提交即可
# 切记不可写成task("古明地觉", 3),这样就变成调用了
future = executor.submit(task, "古明地觉", 3)

# 由于函数里面出现了 time.sleep,并且指定的 n 是 3
# 所以函数内部会休眠 3 秒,显然此时处于运行状态
print(future)
"""
<Future at 0x7fbf701726d0 state=running>
"""

# 我们说 future 相当于一个容器,包含了内部函数的执行状态
# 函数是否正在运行中
print(future.running())
"""
True
"""
# 函数是否执行完毕
print(future.done())
"""
False
"""

# 主程序也 sleep 3 秒
time.sleep(3)

# 显然此时函数已经执行完毕了
# 并且打印结果还告诉我们返回值类型是 str
print(future)
"""
<Future at 0x7fbf701726d0 state=finished returned str>
"""

print(future.running())
"""
False
"""
print(future.done())
"""
True
"""

# 函数执行完毕时,会将返回值设置在 future 里
# 也就是说一旦执行了 future.set_result
# 那么就表示函数执行完毕了,然后外界可以调用 result 拿到返回值
print(future.result())
"""
古明地觉 睡了 3 秒
"""

这里再强调一下 future.result(),这一步是会阻塞的,举个例子:

# 提交函数
future = executor.submit(task, "古明地觉", 3)
start = time.perf_counter()
future.result()
end = time.perf_counter()
print(end - start)  # 3.00331525

可以看到,future.result() 这一步花了将近 3s。其实也不难理解,future.result() 是干嘛的?就是为了获取函数的返回值,可函数都还没有执行完毕,它又从哪里获取呢?所以只能先等待函数执行完毕,将返回值通过 set_result 设置到 future 里面之后,外界才能调用 future.result() 获取到值。

如果不想一直等待的话,那么在获取值的时候可以传入一个超时时间。

from concurrent.futures import (
    ThreadPoolExecutor,
    TimeoutError
)
import time

def task(name, n):
    time.sleep(n)
    return f"name 睡了 n 秒"

executor = ThreadPoolExecutor()
future = executor.submit(task, "古明地觉", 3)
try:
    # 1 秒之内获取不到值,抛出 TimeoutError
    res = future.result(1)
except TimeoutError:
    pass

# 再 sleep 2 秒,显然函数执行完毕了
time.sleep(2)
# 获取返回值
print(future.result())
"""
古明地觉 睡了 3 秒
"""

当然啦,这么做其实还不够智能,因为我们不知道函数什么时候执行完毕。所以最好的办法还是绑定一个回调,当函数执行完毕时,自动触发回调。

from concurrent.futures import ThreadPoolExecutor
import time

def task(name, n):
    time.sleep(n)
    return f"name 睡了 n 秒"

def callback(f):
    print(f.result())

executor = ThreadPoolExecutor()
future = executor.submit(task, "古明地觉", 3)
# 绑定回调,3 秒之后自动调用
future.add_done_callback(callback)
"""
古明地觉 睡了 3 秒
"""

需要注意的是,在调用 submit 方法之后,提交到线程池的函数就已经开始执行了。而不管函数有没有执行完毕,我们都可以给对应的 future 绑定回调。

如果函数完成之前添加回调,那么会在函数完成后触发回调。如果函数完成之后添加回调,由于函数已经完成,代表此时的 future 已经有值了,或者说已经 set_result 了,那么会立即触发回调。

future.set_result 到底干了什么事情

当函数执行完毕之后,会执行 set_result,那么这个方法到底干了什么事情呢?

我们看到 future 有两个被保护的属性,分别是 _result 和 _state。显然 _result 用于保存函数的返回值,而 future.result() 本质上也是返回 _result 属性的值。而 _state 属性则用于表示函数的执行状态,初始为 PENDING,执行中为 RUNING,执行完毕时被设置为 FINISHED。

调用 future.result() 的时候,会判断 _state 的属性,如果还在执行中就一直等待。当 _state 为 FINISHED 的时候,就返回 _result 属性的值。

提交多个函数

我们上面每次只提交了一个函数,但其实可以提交任意多个,我们来看一下:

from concurrent.futures import ThreadPoolExecutor
import time

def task(name, n):
    time.sleep(n)
    return f"name 睡了 n 秒"

executor = ThreadPoolExecutor()
futures = [executor.submit(task, "古明地觉", 3),
           executor.submit(task, "古明地觉", 4),
           executor.submit(task, "古明地觉", 1)]
# 此时都处于running
print(futures)
"""
[<Future at 0x1b5ff622550 state=running>,
 <Future at 0x1b5ff63ca60 state=running>, 
 <Future at 0x1b5ff63cdf0 state=running>]
"""

time.sleep(3)
# 主程序 sleep 3s 后
# futures[0]和futures[2]处于 finished
# futures[1]仍处于 running
print(futures)
"""
[<Future at 0x1b5ff622550 state=running>, 
 <Future at 0x1b5ff63ca60 state=running>, 
 <Future at 0x1b5ff63cdf0 state=finished returned str>]
"""

如果是多个函数,要如何拿到返回值呢?很简单,遍历 futures 即可。

executor = ThreadPoolExecutor()
futures = [executor.submit(task, "古明地觉", 5),
           executor.submit(task, "古明地觉", 2),
           executor.submit(task, "古明地觉", 4),
           executor.submit(task, "古明地觉", 3),
           executor.submit(task, "古明地觉", 6)]

for future in futures:
    print(future.result())
"""
古明地觉 睡了 5 秒
古明地觉 睡了 2 秒
古明地觉 睡了 4 秒
古明地觉 睡了 3 秒
古明地觉 睡了 6 秒
"""

这里面有一些值得说一说的地方,首先 futures 里面有 5 个 future,记做 future1, future2, future3, future4, future5。

当使用 for 循环遍历的时候,实际上会依次遍历这 5 个 future,所以返回值的顺序就是我们添加的函数的顺序。由于 future1 对应的函数休眠了 5s,那么必须等到 5s 后,future1 里面才会有值。

但这五个函数是并发执行的,future2, future3, future4 由于只休眠了 2s, 4s, 3s,所以肯定会先执行完毕,然后执行 set_result,将返回值设置到对应的 future 里。

但 Python 的 for 循环不可能在第一次迭代还没有结束,就去执行第二次迭代。因为 futures 里面的几个 future 的顺序已经一开始就被定好了,只有当第一个 future.result() 执行完成之后,才会执行第二个 future.result(),以及第三个、第四个。

因此即便后面的函数已经执行完毕,但由于 for 循环的顺序,也只能等着,直到前面的 future.result() 执行完毕。所以当第一个 future.result() 结束时,后面三个 future.result() 会立刻输出,因为它们内部的函数已经执行结束了。

而最后一个 future,由于内部函数 sleep 了 6 秒,因此要再等待 1 秒,才会打印 future.result()。

使用 map 来提交多个函数

使用 submit 提交函数会返回一个 future,并且还可以给 future 绑定一个回调。但如果不关心回调的话,那么还可以使用 map 进行提交。

executor = ThreadPoolExecutor()
# map 内部也是使用了 submit
results = executor.map(task,
                       ["古明地觉"] * 3,
                       [3, 1, 2])
# 并且返回的是迭代器
print(results)
"""
<generator object ... at 0x0000022D78EFA970>
"""

# 此时遍历得到的是不再是 future
# 而是 future.result()
for result in results:
    print(result)
"""
古明地觉 睡了 3 秒
古明地觉 睡了 1 秒
古明地觉 睡了 2 秒
"""

可以看到,当使用for循环的时候,map 执行的逻辑和 submit 是一样的。唯一的区别是,此时不需要再调用 result 了,因为返回的就是函数的返回值。

或者我们直接调用 list 也行。

executor = ThreadPoolExecutor()
results = executor.map(task,
                       ["古明地觉"] * 3,
                       [3, 1, 2])
print(list(results))
"""
['古明地觉 睡了 3 秒', 
 '古明地觉 睡了 1 秒', 
 '古明地觉 睡了 2 秒']
"""

results 是一个生成器,调用 list 的时候会将里面的值全部产出。由于 map 内部还是使用的 submit,然后通过 future.result() 拿到返回值,而耗时最长的函数需要 3 秒,因此这一步会阻塞 3 秒。3 秒过后,会打印所有函数的返回值。

按照顺序等待执行

上面在获取返回值的时候,是按照函数的提交顺序获取的。如果我希望哪个函数先执行完毕,就先获取哪个函数的返回值,该怎么做呢?

from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)
import time

def task(name, n):
    time.sleep(n)
    return f"name 睡了 n 秒"

executor = ThreadPoolExecutor()
futures = [executor.submit(task, "古明地觉", 5),
           executor.submit(task, "古明地觉", 2),
           executor.submit(task, "古明地觉", 1),
           executor.submit(task, "古明地觉", 3),
           executor.submit(task, "古明地觉", 4)]
for future in as_completed(futures):
    print(future.result())
"""
古明地觉 睡了 1 秒
古明地觉 睡了 2 秒
古明地觉 睡了 3 秒
古明地觉 睡了 4 秒
古明地觉 睡了 5 秒
"""

此时谁先完成,谁先返回。

取消一个函数的执行

我们通过 submit 可以将函数提交到线程池中执行,但如果我们想取消该怎么办呢?

executor = ThreadPoolExecutor()
future1 = executor.submit(task, "古明地觉", 1)
future2 = executor.submit(task, "古明地觉", 2)
future3 = executor.submit(task, "古明地觉", 3)
# 取消函数的执行
# 会将 future 的 _state 属性设置为 CANCELLED
future3.cancel()
# 查看是否被取消
print(future3.cancelled())  # False

问题来了,调用 cancelled 方法的时候,返回的是False,这是为什么?很简单,因为函数已经被提交到线程池里面了,函数已经运行了。而只有在还没有运行时,取消才会成功。

可这不矛盾了吗?函数一旦提交就会运行,只有不运行才会取消成功,这怎么办?还记得线程池的一个叫做 max_workers 的参数吗?用来控制线程池内的线程数量,我们可以将最大的线程数设置为2,那么当第三个函数进去的时候,就不会执行了,而是处于暂停状态。

executor = ThreadPoolExecutor(max_workers=2)
future1 = executor.submit(task, "古明地觉", 1)
future2 = executor.submit(task, "古明地觉", 2)
future3 = executor.submit(task, "古明地觉", 3)
# 如果池子里可以创建空闲线程
# 那么函数一旦提交就会运行,状态为 RUNNING
print(future1._state)  # RUNNING
print(future2._state)  # RUNNING
# 但 future3 内部的函数还没有运行
# 因为池子里无法创建新的空闲线程了,所以状态为 PENDING
print(future3._state)  # PENDING
# 取消函数的执行,前提是函数没有运行
# 会将 future 的 _state 属性设置为 CANCELLED
future3.cancel()
# 查看是否被取消
print(future3.cancelled())  # True
print(future3._state)  # CANCELLED

在启动线程池的时候,肯定是需要设置容量的,不然处理几千个函数要开启几千个线程吗。另外当函数被取消了,就不可以再调用 future.result() 了,否则的话会抛出 CancelledError。

函数执行时出现异常

我们前面的逻辑都是函数正常执行的前提下,但天有不测风云,如果函数执行时出现异常了该怎么办?

from concurrent.futures import ThreadPoolExecutor

def task1():
    1 / 0

def task2():
    pass


executor = ThreadPoolExecutor(max_workers=2)
future1 = executor.submit(task1)
future2 = executor.submit(task2)
print(future1)
print(future2)
"""
<Future at 0x7fe3e00f9e50 state=finished raised ZeroDivisionError>
<Future at 0x7fe3e00f9eb0 state=finished returned NoneType>
"""

# 结果显示 task1 函数执行出现异常了
# 那么这个异常要怎么获取呢?
print(future1.exception())
print(future1.exception().__class__)
"""
division by zero
<class 'ZeroDivisionError'>
"""

# 如果执行没有出现异常,那么 exception 方法返回 None
print(future2.exception())  # None

# 注意:如果函数执行出现异常了
# 那么调用 result 方法会将异常抛出来
future1.result()
"""
Traceback (most recent call last):
  File "...", line 4, in task1
    1 / 0
ZeroDivisionError: division by zero
"""

出现异常时,调用 future.set_exception 将异常设置到 future 里面,而 future 有一个 _exception 属性,专门保存设置的异常。当调用 future.exception() 时,也会直接返回 _exception 属性的值。

等待所有函数执行完毕

假设我们往线程池提交了很多个函数,如果希望提交的函数都执行完毕之后,主程序才能往下执行,该怎么办呢?其实方案有很多:

第一种:

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(n)
    return f"sleep n"

executor = ThreadPoolExecutor()

future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)

# 这里是不会阻塞的
print("start")
# 遍历所有的 future,并调用其 result 方法
# 这样就会等到所有的函数都执行完毕之后才会往下走
for future in [future1, future2, future3]:
    print(future.result())
print("end")
"""
start
sleep 5
sleep 2
sleep 4
end
"""

第二种:

from concurrent.futures import (
    ThreadPoolExecutor,
    wait
)
import time

def task(n):
    time.sleep(n)
    return f"sleep n"

executor = ThreadPoolExecutor()

future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)

# return_when 有三个可选参数
# FIRST_COMPLETED:当任意一个任务完成或者取消
# FIRST_EXCEPTION:当任意一个任务出现异常
#                  如果都没出现异常等同于ALL_COMPLETED
# ALL_COMPLETED:所有任务都完成,默认是这个值
fs = wait([future1, future2, future3],
          return_when="ALL_COMPLETED")
# 此时返回的fs是DoneAndNotDoneFutures类型的namedtuple
# 里面有两个值,一个是done,一个是not_done
print(fs.done)
"""
<Future at 0x1df1400 state=finished returned str>, 
 <Future at 0x2f08e48 state=finished returned str>, 
 <Future at 0x9f7bf60 state=finished returned str>
"""

print(fs.not_done)
"""
set()
"""
for f in fs.done:
    print(f.result())
"""
start
sleep 5
sleep 2
sleep 4
end
"""

第三种:

# 使用上下文管理
with ThreadPoolExecutor() as executor:
    future1 = executor.submit(task, 5)
    future2 = executor.submit(task, 2)
    future3 = executor.submit(task, 4)

# 所有函数执行完毕(with语句结束)后才会往下执行

第四种:

executor = ThreadPoolExecutor()

future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)
# 所有函数执行结束后,才会往下执行
executor.shutdown()

小结

如果我们需要启动多线程来执行函数的话,那么不妨使用线程池。每调用一个函数就从池子里面取出一个线程,函数执行完毕就将线程放回到池子里以便其它函数执行。如果池子里面空了,或者说无法创建新的空闲线程,那么接下来的函数就只能处于等待状态了。

最后,concurrent.futures 不仅可以用于实现线程池,还可以用于实现进程池。两者的 API 是一样的:

from concurrent.futures import ProcessPoolExecutor
import time

def task(n):
    time.sleep(n)
    return f"sleep n"

executor = ProcessPoolExecutor()
# Windows 上需要加上这一行
if __name__ == '__main__':
    future1 = executor.submit(task, 5)
    future2 = executor.submit(task, 2)
    future3 = executor.submit(task, 4)
    executor.shutdown()
    print(future1.result())
    print(future2.result())
    print(future3.result())
"""
sleep 5
sleep 2
sleep 4
"""

线程池和进程池的 API 是一致的,但工作中很少会创建进程池。

Python猫技术交流群开放啦!群里既有国内一二线大厂在职员工,也有国内外高校在读学生,既有十多年码龄的编程老鸟,也有中小学刚刚入门的新人,学习氛围良好!想入群的同学,请在公号内回复『交流群』,获取猫哥的微信(谢绝广告党,非诚勿扰!)~

还不过瘾?试试它们

7 行代码搞崩溃 B 站,原因令人唏嘘!

与 Python 之父聊天:更快的 Python!

Python 强大的信号库 blinker 入门教程

写好 Python 代码的几条重要技巧

Python 优化提速的 8 个小技巧

一份可以令 Python 变快的工具清单

如果你觉得本文有帮助

请慷慨分享点赞,感谢啦

以上是关于Python 中如何快速实现一个线程池?的主要内容,如果未能解决你的问题,请参考以下文章

线程池与Python中的GIL

线程池实现原理及案列

如何优雅的使用和理解线程池----转

python之进程池与线程池

python 之 并发编程(进程池与线程池同步异步阻塞非阻塞线程queue)

如何优雅的使用和理解线程池