python并发之concurrent.futures

Posted ExplorerMan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python并发之concurrent.futures相关的知识,希望对你有一定的参考价值。

concurrent:并发

  Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,实现了对threadingmultiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。 
concurrent.futures基础模块是executor和future。

  Executor  

  Executor是一个抽象类,它不能被直接使用。它为具体的异步执行定义了一些基本的方法。 ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。

  submit方法

  Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。

  我们使用submit方法来往线程池中加入一个task,submit返回一个Future对象,对于Future对象可以简单地理解为一个在未来完成的操作。

  map方法

  Exectuor还为我们提供了map方法,和内建的map用法类似。映射。

  future

  Future实例是由Executor.submit()创建的。可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

  示例:

  

复制代码
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def foo(i):
    print(\'%s is running %s\'%(os.getpid(),i))
    time.sleep(random.randint(1, 3))
    return i**2
if __name__ == \'__main__\':
    print(\'cpu_num:\',os.cpu_count())
    executor=ProcessPoolExecutor()
    print(\'executor\',executor,type(executor))
    # futures=[]
    # for i in range(10):
    #     future=executor.submit(foo,i)
    #     futures.append(future)
    futures=[executor.submit(foo,i) for i in range(10)]
    executor.shutdown()
    #程序运行到这里有明显的时间间隔,可见是在shutdown存在的情况下,程序将future全部执行完,才继续往下走的
    print(\'主\')
    print(futures)
    for future in futures:
        print(future.result())
复制代码

  输出:

复制代码
cpu_num: 8
executor <concurrent.futures.process.ProcessPoolExecutor object at 0x00000276745AA978> <class \'concurrent.futures.process.ProcessPoolExecutor\'>
11740 is running 0
3156 is running 1
9928 is running 2
2208 is running 3
2324 is running 4
13080 is running 5
1892 is running 6
2964 is running 7
2208 is running 8
2324 is running 9
主
[<Future at 0x27674900e10 state=finished returned int>, <Future at 0x27674949dd8 state=finished returned int>, <Future at 0x27674949e80 state=finished returned int>, <Future at 0x27674949f28 state=finished returned int>, <Future at 0x27674949fd0 state=finished returned int>, <Future at 0x2767495a0b8 state=finished returned int>, <Future at 0x2767495a198 state=finished returned int>, <Future at 0x2767495a278 state=finished returned int>, <Future at 0x2767495a358 state=finished returned int>, <Future at 0x2767495a438 state=finished returned int>]
0
1
4
9
16
25
36
49
64
81
复制代码

  

  利用ThreadProcessExecutor爬虫

  

复制代码
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
def get(url):
    r=requests.get(url)
    return {\'url\':url,\'text\':r.text}
def parse(future):
    dic=future.result()          #future对象调用result方法取其值、
    f=open(\'db.text\',\'a\')
    date=\'url:%s\\n\'%len(dic[\'text\'])
    f.write(date)
    f.close()
if __name__ == \'__main__\':
    executor=ThreadPoolExecutor()
    url_l = [\'http://cn.bing.com/\', \'http://www.cnblogs.com/wupeiqi/\', \'http://www.cnblogs.com/654321cc/\',
                 \'https://www.cnblogs.com/\', \'http://society.people.com.cn/n1/2017/1012/c1008-29581930.html\',
                 \'http://www.xilu.com/news/shaonianxinzangyou5gedong.html\', ]
    futures=[]
    for url in url_l:
        executor.submit(get,url).add_done_callback(parse)         #与Pool进程池回调函数接收的是A函数的返回值(对象ApplyResult.get()得到的值)。
    executor.shutdown()                                           #这里回调函数parse,接收的参数是submit生成的 Future对象。
    print(\'主\')
复制代码

  输出:

 

以上是关于python并发之concurrent.futures的主要内容,如果未能解决你的问题,请参考以下文章

java线程的6种状态以及相互转换

并发编程路线

Python学习:python并发编程之协程

python并发编程之协程

Python并发之协程

Python并发编程之多线程