python—day32 异步 + 回调 Eventgevent 协程单线程下实现遇到IO切换

Posted kermitjam

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python—day32 异步 + 回调 Eventgevent 协程单线程下实现遇到IO切换相关的知识,希望对你有一定的参考价值。

异步 + 回调:就是把下载好的东西回调主进程执行 或者回调给线程,哪个线程闲着就执行

 

 1 #进程的异步 + 回调
 2 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 3 #
 4 # import requests
 5 # import os,time,random
 6 # def get(url):
 7 #     print(‘%s get %s‘%(os.getpid(),url) )
 8 #
 9 #     response = requests.get(url)
10 #     time.sleep(random.randint(1, 3))
11 #
12 #     if response.status_code == 200 :
13 #         #干解析的活 只要下载完立刻进行解析
14 #         return response.text
15 #
16 # def pasrse(obj):
17 #     res = obj.result()
18 #     print(‘%s 解析结果为:%s‘ %(os.getpid(),len(res)))
19 #
20 # if __name__ == ‘__main__‘:
21 #     urls = [
22 #         ‘https://www.baidu.com/‘,
23 #         ‘https://www.baidu.com/‘,
24 #         ‘https://www.baidu.com/‘,
25 #         ‘https://www.baidu.com/‘,
26 #         ‘https://www.baidu.com/‘,
27 #         ‘http://www.sina.com.cn/‘,
28 #         ‘http://www.sina.com.cn/‘,
29 #         ‘http://www.sina.com.cn/‘
30 #     ]
31 #     pool = ProcessPoolExecutor(4)
32 #     # objs = []
33 #     for url in urls:
34 #         #把get函数和url任务扔进进程池
35 #         obj = pool.submit(get,url)
36 #         #提交完后给obj对象绑定了一个工具pasrse
37 #         #任务有返回值就会自动运行,有结果立即调用解析方法pasrse,完成了解耦
38 #         obj.add_done_callback(pasrse)
39 #
40 #     print(‘主进程 %s‘%os.getpid())
41 #         # objs.append(obj)
42 #         # res = pool.submit(get,url).result() 同步解析
43 #     # pool.shutdown(wait=True)
44 #
45 #     #问题
46 #     #1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能继续统一进行处理
47 #     #2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18秒
48 #
49 #     # 串行了
50 #     # for obj in objs:
51 #     #     res = obj.result()
52 #     #     pasrse(res)
53 
54 
55 
56 #哪个线程闲着就用回调函数
57 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
58 from threading import current_thread
59 import requests
60 import os,time,random
61 def get(url):
62     print(%s get %s%(current_thread().name,url) )
63 
64     response = requests.get(url)
65     time.sleep(random.randint(1, 3))
66 
67     if response.status_code == 200 :
68         #干解析的活 只要下载完立刻进行解析
69         return response.text
70 
71 def pasrse(obj):
72     res = obj.result()
73     print(%s 解析结果为:%s %(current_thread().name,len(res)))
74 
75 if __name__ == __main__:
76     urls = [
77         https://www.baidu.com/,
78         https://www.baidu.com/,
79         https://www.baidu.com/,
80         https://www.baidu.com/,
81         https://www.baidu.com/,
82         http://www.sina.com.cn/,
83         http://www.sina.com.cn/,
84         http://www.sina.com.cn/
85     ]
86     pool = ThreadPoolExecutor(4)
87     # objs = []
88     for url in urls:
89         #把get函数和url任务扔进进程池
90         obj = pool.submit(get,url)
91         #提交完后给obj对象绑定了一个工具pasrse
92         #任务有返回值就会自动运行,有结果立即调用解析方法pasrse,完成了解耦
93         obj.add_done_callback(pasrse)
94 
95     print(主线程 %s%current_thread().name)

线程Queue:

 1 import queue
 2 
 3 q=queue.Queue(3) #队列:先进先出
 4 q.put(1)
 5 q.put(2)
 6 q.put(3)
 7 # q.put(4)
 8 
 9 print(q.get())
10 print(q.get())
11 print(q.get())
12 
13 
14 q=queue.LifoQueue(3) #堆栈:后进先出
15 
16 q.put(a)
17 q.put(b)
18 q.put(c)
19 
20 print(q.get())
21 print(q.get())
22 print(q.get())
23 
24 
25 q=queue.PriorityQueue(3) #优先级队列:可以以小元组的形式往队列里存值,第一个元素代表优先级,数字越小优先级越高
26 q.put((10,user1))
27 q.put((-3,user2))
28 q.put((-2,user3))
29 
30 
31 print(q.get())
32 print(q.get())
33 print(q.get())

 

线程Event:event.wait()

 1 from threading import Event,current_thread,Thread
 2 import time
 3 
 4 event=Event()
 5 
 6 def check():
 7     print(%s 正在检测服务是否正常.... %current_thread().name)
 8     time.sleep(5)
 9     event.set()
10 
11 
12 def connect():
13     count=1
14     while not event.is_set():
15         if count ==  4:
16             print(尝试的次数过多,请稍后重试)
17             return
18         print(%s 尝试第%s次连接... %(current_thread().name,count))
19         event.wait(1)
20         count+=1
21     print(%s 开始连接... % current_thread().name)
22 
23 if __name__ == __main__:
24     t1=Thread(target=connect)
25     t2=Thread(target=connect)
26     t3=Thread(target=connect)
27 
28     c1=Thread(target=check)
29 
30     t1.start()
31     t2.start()
32     t3.start()
33     c1.start()

gevent:

 

 1 from gevent import monkey;monkey.patch_all()
 2 from threading import current_thread
 3 import gevent
 4 import time
 5 
 6 def eat():
 7     print(%s eat 1 %current_thread().name)
 8     time.sleep(5)
 9     print(%s eat 2 %current_thread().name)
10 def play():
11     print(%s play 1 %current_thread().name)
12     time.sleep(3)
13     print(%s play 2 %current_thread().name)
14 
15 g1=gevent.spawn(eat)
16 g2=gevent.spawn(play)
17 
18 # gevent.sleep(100)
19 # g1.join()
20 # g2.join()
21 print(current_thread().name)
22 gevent.joinall([g1,g2])

 

 

 

协程:

1、单线程下实现并发:协程

  并发指的是多个任务看起来是同时运行的

  并发实现的本质:切换 + 保存状态

  并发、并行、串行

  并发:看起来是同时运行,切换 + 保存状态

    实现并行,4个cpu能够并行4个任务

  串行:一个人完完整整地执行完毕才能运行下一个任务

 

 1 import time
 2 def consumer():
 3     ‘‘‘任务1:接收数据,处理数据‘‘‘
 4     while True:
 5         x=yield
 6 
 7 
 8 def producer():
 9     ‘‘‘任务2:生产数据‘‘‘
10     g=consumer()
11     next(g)
12     for i in range(10000000):
13         g.send(i)
14 
15 start=time.time()
16 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
17 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
18 producer() #1.0202116966247559
19 
20 
21 stop=time.time()
22 print(stop-start)

 

 1 import time
 2 def consumer(res):
 3     ‘‘‘任务1:接收数据,处理数据‘‘‘
 4     pass
 5 
 6 def producer():
 7     ‘‘‘任务2:生产数据‘‘‘
 8     res=[]
 9     for i in range(10000000):
10         res.append(i)
11 
12     consumer(res)
13     # return res
14 
15 start=time.time()
16 #串行执行
17 res=producer()
18 stop=time.time()
19 print(stop-start)
 1 # 纯计算的任务串行执行
 2 import time
 3 def task1():
 4     res=1
 5     for i in range(1000000):
 6         res+=i
 7 
 8 def task2():
 9     res=1
10     for i in range(1000000):
11         res*=i
12 
13 start=time.time()
14 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
15 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
16 task1()
17 task2()
18 stop=time.time()
19 print(stop-start)
20 
21 
22 
23 # 纯计算的任务并发执行
24 import time
25 def task1():
26     res=1
27     for i in range(1000000):
28         res+=i
29         yield
30         time.sleep(10000)
31         print(task1)
32 
33 def task2():
34     g=task1()
35     res=1
36     for i in range(1000000):
37         res*=i
38         next(g)
39         print(task2)
40 
41 start=time.time()
42 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
43 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
44 task2()
45 stop=time.time()
46 print(stop-start)

单线程下实现遇到IO切换:

 

 1 from greenlet import greenlet
 2 import time
 3 
 4 def eat(name):
 5     print(%s eat 1 %name)
 6     time.sleep(30)
 7     g2.switch(alex)
 8     print(%s eat 2 %name)
 9     g2.switch()
10 def play(name):
11     print(%s play 1 %name)
12     g1.switch()
13     print(%s play 2 %name)
14 
15 g1=greenlet(eat)
16 g2=greenlet(play)
17 
18 g1.switch(egon)

 

以上是关于python—day32 异步 + 回调 Eventgevent 协程单线程下实现遇到IO切换的主要内容,如果未能解决你的问题,请参考以下文章

Python Day35进程池,回调函数

day35 爬虫简述

Python_Day11_同步IO和异步IO

PHP回调函数到底是个什么

Pybind11:为啥来自 Python 的异步调用不能在 C++ 中正确执行回调?

python异步编程--回调模型(selectors模块)