Python concurrent库简单用法

Posted tsuko

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python concurrent库简单用法相关的知识,希望对你有一定的参考价值。

内容:

  • concurrent库简介

  • ThreadPoolExecutor.map()

  • ThreadPoolExecutor.submit()和.as_completed()

concurrent库简介

  Python 3.4+的concurrent库用于多线程、并发编程,抽象层次高,使用方便。用得最多的是concurrent里面的futures类,futures又有ThreadPoolExecutorProcessPoolExcutor两个子类,前者用于多线程,后者用于多进程。

 

ThreadPoolExecutor.map()

  ThreadPoolExcecutor类使用见下面代码:

技术分享图片
from concurrent import futures
import time

def f(x):
    return x*(8-x)


def test_func(n):
    print(f"{n} starts!")
    for i in range(n):
        time.sleep(0.5)
    return f"n = {n} completed!"


def test(max_workers=4):
    with futures.ThreadPoolExecutor(max_workers) as executor:
        list1 = [ f(x) for x in range(1, max_workers+1)]
        # it is a iterator of results.
        it = executor.map(test_func, list1)
        print(it)
        for x in  it:
            print(x)

if __name__ == "__main__":
    m_workers = 6
    test(m_workers)
Example

  解释:上面的例子中,对futures.ThreadPoolExecutor(也就是executor)调用map方法,这个map同内置的map用法一样,第一个参数为只接受一个参数的函数,后一个为可迭代对象。不同的是,这个map方法会把对函数的调用映射到到多个线程中。并返回一个future的迭代器。

  其中,f(x)是我特地构造的非单调函数,它返回一个值,决定了test_func()的睡眠时间,也就决定了线程执行完成的先后。

  以下是上面例子的结果:

技术分享图片
7 starts!
12 starts!
15 starts!
16 starts!
15 starts!
12 starts!
<generator object Executor.map.<locals>.result_iterator at 0x000002093367AA98>
n = 7 completed!
n = 12 completed!
n = 15 completed!
n = 16 completed!
n = 15 completed!
n = 12 completed!
Result1

  上面反映出迭代器it是依照线程启动的顺序来迭代结果的,而非线程完成的先后顺序。it中的元素是各个线程(函数)返回的结果。在实际的运行过程中,it是逐渐等待线程完成(n = 7 - 16),然后几乎是同时把剩下的结果都打印出来。这说明前面(n = 7 ~ 16)的过程是阻塞的,而后面部分的线程已经全部执行完毕,没有阻塞。

  那么当len(list1) > max_workers的时候呢?稍微修改程序:list1 = [ f(x) for x in range(1, max_workers+10)。程序执行结果如下:

技术分享图片
7 starts!
12 starts!
15 starts!
16 starts!
15 starts!
12 starts!
<generator object Executor.map.<locals>.result_iterator at 0x0000029134D25A40> # print(it)
7 starts!
n = 7 completed!  # 开始打印结果了,然而还有线程刚开始
0 starts!
n = 12 completed! # 阻塞
-9 starts!
-20 starts!
-33 starts!
-48 starts!
-65 starts!
-84 starts!
-105 starts!
n = 15 completed! 
n = 16 completed!
n = 15 completed!
n = 12 completed!
n = 7 completed!
n = 0 completed!
n = -9 completed!
n = -20 completed!
n = -33 completed!
n = -48 completed!
n = -65 completed!
n = -84 completed!
n = -105 completed!
Result2

  那么这个修改后的Python代码执行过程又是如何呢?(未完)

  

ThreadPoolExecutor.submit()和futures.as_completed()

  .submit(func, *args)接受一个参数,*args的内容作为这个函数的参数,返回一个future对象。

  futures.as_completed()返回一个迭代器,跟上吗.map()不同,这个迭代器的迭代顺序依照future返回(线程结束)的顺序。代码如下:

技术分享图片
def test_2(max_workers=4):
    with futures.ThreadPoolExecutor(max_workers) as executor:
        list1 = [f(x) for x in range(1, max_workers+1)]
        it_ls = []
        for x in list1:
            future = executor.submit(test_func, x) # 返回一个future
            it_ls.append(future)
        done_iter = futures.as_completed(it_ls)  # as_completed()返回迭代器
        print(done_iter)
        for x in done_iter: # x是future对象
            print(x.result())
.submit() 和 .as_completed()

  程序执行结果如下:

技术分享图片
7 starts!
12 starts!
15 starts!
16 starts!
15 starts!
12 starts!
<generator object as_completed at 0x0000028AD6B79990>
n = 7 completed!
n = 12 completed!
n = 12 completed!
n = 15 completed!
n = 15 completed!
n = 16 completed!
Result_3

   

   

    

以上是关于Python concurrent库简单用法的主要内容,如果未能解决你的问题,请参考以下文章

python并发之concurrent.futures

Python3模块concurrent.futures模块,线程池进程池

Python3标准库:concurrent.futures管理并发任务池

python库 argparse的add_argument基础用法

使用concurrent.futures模块并发,实现进程池线程池

pqdm 是 tqdm 和 concurrent.futures 的 wrapper | 一个小而美的 Python 并行计算库 | 实现多进程显示进度条的优雅方案