使用concurrent.futures模块并发,实现进程池线程池
Posted 听风。
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用concurrent.futures模块并发,实现进程池线程池相关的知识,希望对你有一定的参考价值。
一、关于concurrent.futures模块
Python标准库为我们提供了threading和multiprocessing模块编写相应的异步多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures
模块,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
两个类ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。实现了对threading
和multiprocessing
的更高级的抽象,对编写线程池/进程池提供了直接的支持。
concurrent.futures基础模块是executor和future。
concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
Future这个概念相信有java和nodejs下编程经验的朋友肯定不陌生了,你可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。
Executor中定义了submit()
方法,这个方法的作用是提交一个可执行的回调task
,并返回一个future实例。future对象代表的就是给定的调用。
二、submit()
方法实现进程池/线程池
进程池
from concurrent.futures import ProcessPoolExecutor import os,time,random def task(n): print(‘%s is running‘ %os.getpid()) time.sleep(2) return n**2 if __name__ == ‘__main__‘: p=ProcessPoolExecutor() #不填则默认为cpu的个数 l=[] start=time.time() for i in range(10): obj=p.submit(task,i) #submit()方法返回的是一个future实例,要得到结果需要用obj.result() l.append(obj) p.shutdown() #类似用from multiprocessing import Pool实现进程池中的close及join一起的作用 print(‘=‘*30) # print([obj for obj in l]) print([obj.result() for obj in l]) print(time.time()-start) #上面方法也可写成下面的方法 # start = time.time() # with ProcessPoolExecutor() as p: #类似打开文件,可省去.shutdown() # future_tasks = [p.submit(task, i) for i in range(10)] # print(‘=‘ * 30) # print([obj.result() for obj in future_tasks]) # print(time.time() - start)
线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import threading import os,time,random def task(n): print(‘%s:%s is running‘ %(threading.currentThread().getName(),os.getpid())) time.sleep(2) return n**2 if __name__ == ‘__main__‘: p=ThreadPoolExecutor() #不填则默认为cpu的个数*5 l=[] start=time.time() for i in range(10): obj=p.submit(task,i) l.append(obj) p.shutdown() print(‘=‘*30) print([obj.result() for obj in l]) print(time.time()-start) #上面方法也可写成下面的方法 # start = time.time() # with ThreadPoolExecutor() as p: #类似打开文件,可省去.shutdown() # future_tasks = [p.submit(task, i) for i in range(10)] # print(‘=‘ * 30) # print([obj.result() for obj in future_tasks]) # print(time.time() - start)
默认为异步执行
#p.submit(task,i).result()即同步执行 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random def task(n): print(‘%s is running‘ %os.getpid()) time.sleep(2) return n**2 if __name__ == ‘__main__‘: p=ProcessPoolExecutor() start=time.time() for i in range(10): res=p.submit(task,i).result() print(res) print(‘=‘*30) print(time.time()-start)
三、回调函数
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import requests import os import time from threading import currentThread def get_page(url): print(‘%s:<%s> is getting [%s]‘ %(currentThread().getName(),os.getpid(),url)) response=requests.get(url) time.sleep(2) return {‘url‘:url,‘text‘:response.text} def parse_page(res): #此处的res是一个p.submit获得的一个future对象,不是结果 res=res.result() #res.result()拿到的才是对应的结果 print(‘%s:<%s> parse [%s]‘ %(currentThread().getName(),os.getpid(),res[‘url‘])) with open(‘db.txt‘,‘a‘) as f: parse_res=‘url:%s size:%s\n‘ %(res[‘url‘],len(res[‘text‘])) f.write(parse_res) if __name__ == ‘__main__‘: # p=ProcessPoolExecutor() p=ThreadPoolExecutor() urls = [ ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ‘https://www.baidu.com‘, ] for url in urls: # multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page) p.submit(get_page, url).add_done_callback(parse_page) #与之前的回调函数拿到的结果不同,这里拿到的是前面submit方法执行完后返回的对象,要.result才能拿到对应的结果 p.shutdown() print(‘主‘,os.getpid())
四、map方法
和内置函数map差不多的用法,这个方法返回一个map(func, *iterables)迭代器,迭代器中的回调执行返回的结果有序的。
以下是通过concurrent.futures模块下类ThreadPoolExecutor和ProcessPoolExecutor实例化的对象的map方法实现进程池、线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time def task(n): print(‘%s is running‘ %os.getpid()) time.sleep(2) return n**2 if __name__ == ‘__main__‘: # p=ProcessPoolExecutor() p=ThreadPoolExecutor() start = time.time() obj=p.map(task,range(10)) p.shutdown() print(‘=‘*30) print(list(obj)) print(time.time() - start)
以上是关于使用concurrent.futures模块并发,实现进程池线程池的主要内容,如果未能解决你的问题,请参考以下文章
Python并发编程之线程池/进程池--concurrent.futures模块