python学习道路(day11note)(协程,同步与异步的性能区别,url爬网页,select,RabbitMq)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python学习道路(day11note)(协程,同步与异步的性能区别,url爬网页,select,RabbitMq)相关的知识,希望对你有一定的参考价值。

1.协程

 1 #协程  又称微线程  是一种用户的轻量级线程   程序级别代码控制 就不用加机器
 2 #不同函数 = 不同任务   A函数切到B函数没有进行cpu级别的切换,而是程序级别的切换就是协程  yelied
 3 
 4 #单线程下多个任务流用协程,比如打电话可以切换,nginx
 5 #爽妹给你打电话的时候,她不说话,刘征电话过来时候你可以切过去,这时候要是爽妹说话,就会bibi响
 6 ‘‘‘
 7 
 8 协程的好处:
 9 无需线程上下文切换的开销
10 无需原子操作锁定及同步的开销
11   "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何
12 context
13 switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
14 方便切换控制流,简化编程模型
15 高并发 + 高扩展性 + 低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
16 
17 缺点:
18 无法利用多核资源:协程的本质是个单线程, 它不能同时将
19 单个CPU
20 的多个核用上, 协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
21 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

 

2.Greenlet and Gevent

 1 #greenlet 模块
 2 #greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator
 3 from greenlet import greenlet
 4 
 5 from greenlet import greenlet
 6 def test1():
 7     print(12)
 8     #time.sleep(1)  #但是遇到IO就会阻塞了,这里延迟了一秒,如果自动切换的话,应该立马执行gr2
 9     gr2.switch()
10     print(34)
11     gr2.switch()
12 
13 def test2():
14     print(56)
15     gr1.switch()
16     print(78)
17 
18 gr1 = greenlet(test1)  #生成协程
19 gr2 = greenlet(test2)  #生成协程
20 gr1.switch()   #启动协程
21 #但是遇到IO会不会自动切换呢?上面是手动切换的  引出 Gevent
 1 #Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,
 2 #(接着上面一条)它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
 3 import gevent
 4 
 5 def func1():
 6     print(\033[31;1m李闯在跟海涛搞...\033[0m) #1      1
 7     gevent.sleep(2)  #相当于io time.sleep 卡住,看看会不会自动切换,还是等待?
 8     print(\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m) #4   6
 9 
10 def func2():
11     print(\033[32;1m李闯切换到了跟海龙搞...\033[0m) #2   2
12     gevent.sleep(1)  #这里自动切换的时候上面还在阻塞呢,所以又回来了,所以按 1234标识了走向
13     print(\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m) #3   4
14 
15 def func3():
16     print(3333)   #0   3
17     gevent.sleep(1)
18     print(4444)   #0   5
19 
20 gevent.joinall([   #joinall等待所有协程结束  这是一个列表
21     gevent.spawn(func1),  #产生协程
22     gevent.spawn(func2),
23     gevent.spawn(func3),
24 ])

 

3.同步与异步的性能区别

 1 #同步与异步的性能区别
 2 import gevent
 3 
 4 def task(pid):
 5     """
 6     Some non-deterministic task
 7     """
 8     gevent.sleep(0.5)
 9     print(Task %s done % pid)
10 
11 def synchronous(): #同步就是串行的效果
12     for i in range(1, 10):
13         task(i)
14 
15 def asynchronous():  #异步就是并发的效果
16     threads = [gevent.spawn(task, i) for i in range(10)]
17     gevent.joinall(threads)
18 
19 print(Synchronous:)   #同步
20 synchronous()
21 print(Asynchronous:)   #异步
22 asynchronous()

 

4.爬网页

 1 #url爬网页
 2 import gevent
 3 from  urllib.request import urlopen   #现在还是阻塞的模式,因为urllib遇到Io不知道这是Io操作,所以需要导入一个gevevt插件,
 4 #相当于打个补丁,就会把urllib 里面涉及IO操作的都改成异步的模式,不阻塞的模式
 5 from gevent import monkey  #补丁
 6 monkey.patch_all()   #补丁 注意顺序
 7 import time
 8 
 9 def pa_web_page(url):
10     print(get url,url)
11     req = urlopen(url) #抓取url
12     data = req.read()  #读取结果
13     print(data)
14     print(%d bytes received from %s. % (len(data), url))
15 
16 t1_start = time.time()  #开始时间
17 pa_web_page("https://www.baidu.com")
18 pa_web_page("http://www.xiaohuar.com")
19 print(time close t1,time.time()-t1_start) #做减法
20 
21 
22 #下面是协程gevent写法,遇到阻塞就会自动切换,节省了时间
23 t2_start = time.time()  #开始时间
24 gevent.joinall([
25         gevent.spawn(pa_web_page, https://www.baidu.com),  #pa_web_page,函数名 https://www.baidu.com url
26         gevent.spawn(pa_web_page, http://www.xiaohuar.com),
27 ])
28 print(time close t2,time.time()-t2_start) #做减法

 

5.通过gevent实现单线程下的多socket并发

server code

 1 import sys
 2 import socket
 3 import time
 4 import gevent
 5 
 6 from gevent import socket, monkey
 7 monkey.patch_all()
 8 
 9 def server(port):
10     s = socket.socket()
11     s.bind((0.0.0.0, port))
12     s.listen(500)
13     while True:
14         cli, addr = s.accept()
15         gevent.spawn(handle_request, cli)   #之前写线程sockserver的时候是起一个线程,这里是起协程
16         # handle_request自己写的方法  所有请求到这个函数区处理
17 
18 def handle_request(conn):
19     try:
20         while True:
21             data = conn.recv(1024)
22             print("recv:", data)
23             conn.send(data)
24             if not data:
25                 conn.shutdown(socket.SHUT_WR)  #相当于断开连接,清空了
26 
27     except Exception as  ex:
28         print(ex)
29     finally:
30         conn.close()
31 
32 if __name__ == __main__:
33     server(8001)

client code

 1 #并发100个链接  如果连接报错,就说明开不起线程了,确实支持大并发了
 2 import socket
 3 import threading
 4 
 5 def sock_conn():
 6     client = socket.socket()
 7     client.connect(("localhost",8001))
 8     count = 0
 9     while True:
10         #msg = input(">>:").strip()
11         #if len(msg) == 0:continue
12         client.send( ("hello %s" %count).encode("utf-8"))
13 
14         data = client.recv(1024)
15 
16         print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
17         count +=1
18     client.close()
19 
20 for i in range(100):
21     t = threading.Thread(target=sock_conn)
22     t.start()

 

6.Select\Poll\Epoll IO多路复用

select

 1 import socket
 2 import select
 3 import queue
 4 server = socket.socket()
 5 server.bind(("localhost",8001))
 6 server.listen(5)
 7 server.setblocking(0) #设置为非堵塞
 8 inputs = [server]   #select 维护的列表,也是是传过来的链接  首先是监听自己
 9 msg_queues = {}  #字典,为了收取数据作用,理论上应该有2个,一个是收,一个是取
10 outputs = []
11 
12 while True:
13     r_list,w_list,exception_list = select.select(inputs,outputs,inputs)   #inputs检测所有socket有没有消息古来  outputs不知道   inputs检测哪些socket有没有错(错误)
14 #针对 inputs 来返回哪些就绪的列表,所以r_list里面的就已经是就绪的  相当于链接
15     # print("r_list",r_list)
16     # print("w_list",w_list)
17     # print("e_list",exception_list)
18     for s in r_list:   #数据流
19         if s is server:  #这是一个新链接
20             conn,addr = s.accept() #接收请求  同时可以监听多个请求了
21             print("got a new conn",conn,addr)
22             inputs.append(conn)  #让select去监测客户端是否有数据过来
23             msg_queues[conn] = queue.Queue()   #为了给客户端返回数据,先创建的数据字典
24         else:
25             try:
26                 data = s.recv(1024)
27                 print("recv data from [%s]:[%s]" % (s.getpeername(),data.decode()))
28                 msg_queues[s].put(data)
29                 if s not in outputs:
30                     outputs.append(s)    #等下次select的时候,确保w_list的数据能返回给客户端
31             except ConnectionResetError as e:
32                 print("conn closed.",s.getpeername(),e)
33 
34                 inputs.remove(s)   #链接出问题,或意外终止
35                 if s in outputs:
36                     outputs.remove(s)
37                     del msg_queues[s]
38 
39     for s in w_list:   #给客户端返回追备好的数据
40         try:
41             data = msg_queues[s].get_nowait()
42             s.send(data.upper())
43         except queue.Empty as e:
44             outputs.remove(s)

selectors select的升级版,自动适应版本执行epool效率更高

 1 #selectors模块  这是自适应的,你系统默认支持的话,就会epool  》pool  》select  相比select更方便 写这个代码默认epool
 2 
 3 import selectors
 4 import socket
 5 
 6 def accept(sock, mask):
 7     conn, addr = sock.accept()  # Should be ready
 8     print(accepted, conn, from, addr)
 9     conn.setblocking(False)
10     sel.register(conn, selectors.EVENT_READ, read)
11     #监听数据流,如果消息事件来了,调用read方法 注册conn用于监控流了
12 
13 def read(conn, mask):
14     data = conn.recv(1000)  # Should be ready
15     if data:
16         print(echoing, repr(data), to, conn)
17         conn.send(data)  # Hope it won‘t block
18     else:
19         print(closing, conn)
20         sel.unregister(conn)   #删除链接清空 跟select remove一样
21         conn.close()
22 
23 sock = socket.socket()
24 sock.bind((localhost, 8001))  #端口是0-65535  1024系统保留
25 sock.listen(100)
26 sock.setblocking(False)
27 
28 sel = selectors.DefaultSelector()
29 sel.register(sock, selectors.EVENT_READ, accept)  #sock相当于注册,注册一个什么呢,注册一个EVENT_READ读事件  这只是注册呢没有实际监听
30 #相当于  select.select(inputs,outputs.... EVENT_READ 监听,如果有请求就会调用accept)
31 
32 while True:
33     events = sel.select()   #如果没有事件就会卡这里,select监听,
34     for key, mask in events:
35         callback = key.data   #相当于accept内存对象
36         print(key,mask)
37         callback(key.fileobj, mask)

 

7.RabbitMq 进程队列

server

 1 # !/usr/bin/env python
 2 import pika
 3 
 4 # credentials = pika.PlainCredentials(‘alex‘,‘alex3714‘)  假如需要验证的时候用这2条就可以连接
 5 # connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.10.140‘,credentials=credentials))
 6 
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.10.10.140))
 8 channel = connection.channel()
 9 
10 # 声明queue
11 channel.queue_declare(queue=hello1,durable=True)#durable=True  queue队列持久化,rabbitmq重启不会丢失,但是消息会丢
12 #如果之前这里生命过durable,在recv端也要这样声明
13 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
14 channel.basic_publish(exchange=‘‘,
15                       routing_key=hello1,
16                       body=Hello World!1,
17                       properties=pika.BasicProperties(   #消息持久化  rabbit重启消息不会丢
18                           delivery_mode=2,  # make message persistent   #消息持久化abbit重启消息不会丢
19                       ))
20 print(" [x] Sent ‘Hello World1!‘")
21 connection.close()

client

 1 # _*_coding:utf-8_*_
 2 __author__ = Alex Li
 3 import pika
 4 import time
 5 
 6 # credentials = pika.PlainCredentials(‘alex‘,‘alex3714‘)  假如需要验证的时候用这2条就可以连接
 7 # connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.10.140‘,credentials=credentials))
 8 
 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.10.10.140))
10 channel = connection.channel()
11 
12 # You may ask why we declare the queue again ? we have already declared it in our previous code.
13 # We could avoid that if we were sure that the queue already exists. For example if send.py program
14 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good
15 # practice to repeat declaring the queue in both programs.
16 channel.queue_declare(queue=hello1,durable=True)  #如果确定这个queue声明过了,可以不用写,但是写上最好,因为不确定send端还是recv先启动
17 
18 def callback(ch, method, properties, body):  #body消息
19     #ch   channel对象   method   声明的一推参数,消息里面的一些属性信息  properties跟随消息传一些参数会在这个里面
20     print(" [x] Received %r" % body)
21     # time.sleep(10)  #用于测试work queue
22 
23 channel.basic_qos(prefetch_count=1)  #消息公平化,如果有一个消息没有处理完就别给我发新的
24 channel.basic_consume(callback,    #在hello queue里面收取消息执行callback函数
25                       queue=hello1,
26                       #no_ack=True
27                       )   #true  默认开启 work queue   这样可以确保即使消息发送的时候中断,也会受到信息no_ack=True这个是关闭了
28 
29 print( [*] Waiting for messages. To exit press CTRL+C)
30 channel.start_consuming()  #有消息就收,没有消息就会卡住  监听

 

以上是关于python学习道路(day11note)(协程,同步与异步的性能区别,url爬网页,select,RabbitMq)的主要内容,如果未能解决你的问题,请参考以下文章

python学习道路(day7note)(subprocess模块,面向对象)

python学习道路(day9note)(socketserver编程,ftp)

python学习道路(day8note)(抽象类,类的方法,异常处理,socket编程)

python学习道路(day5note)(列表生成式,生成器,装饰器,常用模块)

python学习道路(day6note)(time &datetime,random,shutil,shelve,xml处理,configparser,hashlib,logging模块,re

Python_Note_Day 10_Coroutine