并发编程---线程queue---进程池线程池---异部调用(回调机制)
Posted Mr。yang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程---线程queue---进程池线程池---异部调用(回调机制)相关的知识,希望对你有一定的参考价值。
线程
- 队列:先进先出
- 堆栈:后进先出
- 优先级:数字越小优先级越大,越先输出
import queue q = queue.Queue(3) # 先进先出-->队列 q.put(\'first\') q.put(2) # q.put(\'third\') # q.put(4) #由于没有人取走,就会卡主 q.put(4,block=False) #等同于q.get_nowait(), Ture 阻塞,Flase不阻塞,报异常满了 # # q.put(4,block=True,timeout=3) print(q.get()) print(q.get()) print(q.get()) print(q.get(block=True,timeout=3)) # 阻塞等待3秒 没有取走数据就报异常 # print(q.get(block=False)) #等同于q.get_nowait() # print(q.get_nowait()) q = queue.LifoQueue(3) #后进先出-->堆栈 q.put(\'first\') q.put(2) q.put(\'third\') print(q.get()) print(q.get()) print(q.get()) \'\'\' 打印结果: third 2 first \'\'\' q = queue.PriorityQueue(3) #优先级队列 q.put((10,\'one\')) q.put((40,\'two\')) q.put((30,\'three\')) print(q.get()) print(q.get()) print(q.get()) \'\'\' 数字越小优先级越高 打印结果 (10, \'one\') (30, \'three\') (40, \'two\') \'\'\'
进程池线程池
- 池:是用来对进程(线程)的数量加以限制
- 进程池:计算密集型,用多进程
- 线程池:IO密集型,用多线程,例如:sockect网络通信就应该用多线程
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random \'\'\' sockect网络通信是IO操作,所以用多线程 计算密集型:用多进程 \'\'\' def task(name): print(\'name:%s pid:%s run\' %(name,os.getpid())) time.sleep(random.randint(1,3)) if __name__ == \'__main__\': # pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数 pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task,\'yang%s\' %i) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行 pool.shutdown(wait=True) # 类似join 代表往池子里面丢任务的入口关掉 计数器-1 print(\'主\') \'\'\' 打印结果: name:yang0 pid:11120 run name:yang1 pid:11120 run name:yang2 pid:11120 run name:yang3 pid:11120 run name:yang4 pid:11120 run name:yang5 pid:11120 run name:yang6 pid:11120 run name:yang7 pid:11120 run name:yang8 pid:11120 run name:yang9 pid:11120 run 主 \'\'\' from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread import os,time,random def task(): print(\'name:%s pid:%s run\' %(currentThread().getName(),os.getpid())) time.sleep(random.randint(1,3)) if __name__ == \'__main__\': # pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数 pool = ThreadPoolExecutor(5) for i in range(10): pool.submit(task) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行 pool.shutdown(wait=True) # 类似join 代表往池子里面丢任务的入口关掉 计数器-1 print(\'主\') \'\'\' 打印结果: name:ThreadPoolExecutor-0_0 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_2 pid:14052 run name:ThreadPoolExecutor-0_1 pid:14052 run name:ThreadPoolExecutor-0_3 pid:14052 run name:ThreadPoolExecutor-0_4 pid:14052 run name:ThreadPoolExecutor-0_0 pid:14052 run 主 \'\'\'
同步调用和异步调用
提交任务的两种方式:
- 同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行
- 异步调用:提交完任务后,不在原地等待任务执行完。回调机制:自动触发
#1.同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行 from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print(\'%s is laing\' %name) time.sleep(random.randint(3,5)) res = random.randint(7,13)*\'#\' return {\'name\':name,\'res\':res} def weigh(shit): name = shit[\'name\'] size = len(shit[\'res\']) print(\'%s 拉了 <%s>kg\' %(name,size)) if __name__ == \'__main__\': pool = ThreadPoolExecutor(10) shit1 = pool.submit(la,\'alex\').result() weigh(shit1) shit2 = pool.submit(la,\'yang\').result() weigh(shit2) shit3 = pool.submit(la,\'hang\').result() weigh(shit3) \'\'\' 打印结果: alex is laing alex 拉了 <8>kg yang is laing yang 拉了 <8>kg hang is laing hang 拉了 <7>kg \'\'\'
#2.异步调用:提交完任务后,不在原地等待任务执行完 from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print(\'%s is laing\' %name) time.sleep(random.randint(3,5)) res = random.randint(7,13)*\'#\' return {\'name\':name,\'res\':res} # weigh({\'name\':name,\'res\':res}) # 这样写,所有功能 不能体现出解耦合 def weigh(shit): shit = shit.result() # 拿到是一个对象,需要进行result() name = shit[\'name\'] size = len(shit[\'res\']) print(\'%s 拉了 <%s>kg\' %(name,size)) if __name__ == \'__main__\': pool = ThreadPoolExecutor(10) shit1 = pool.submit(la,\'alex\').add_done_callback(weigh) shit2 = pool.submit(la,\'yang\').add_done_callback(weigh) shit3 = pool.submit(la,\'hang\').add_done_callback(weigh) \'\'\' 打印结果: alex is laing yang is laing hang is laing hang 拉了 <10>kg alex 拉了 <7>kg yang 拉了 <12>kg \'\'\'
异步调用的应用
from concurrent.futures import ThreadPoolExecutor import requests import time def get(url): print(\'GET %s\'%url) response = requests.get(url) time.sleep(3) return {\'url\':url,\'content\':response.text} def parse(res): res = res.result() print(\'%s parse res is %s\' %(res[\'url\'],len(res[\'content\']))) if __name__ == \'__main__\': urls = [ \'http://www.cnblogs.com/linhaifeng\', \'https://www.python.org\', \'https://www.openstack.org\', ] pool = ThreadPoolExecutor(2) for url in urls: pool.submit(get,url).add_done_callback(parse) \'\'\' 打印结果: GET http://www.cnblogs.com/linhaifeng GET https://www.python.org http://www.cnblogs.com/linhaifeng parse res is 16320 GET https://www.openstack.org https://www.python.org parse res is 49273 https://www.openstack.org parse res is 64040 \'\'\'
以上是关于并发编程---线程queue---进程池线程池---异部调用(回调机制)的主要内容,如果未能解决你的问题,请参考以下文章
并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制
并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制