Python基础15 - 协程异步IO

Posted auqf8612

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python基础15 - 协程异步IO相关的知识,希望对你有一定的参考价值。

@@@文章内容参照老男孩教育 Alex金角大王,武Sir银角大王@@@

一、协程

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员

协程,又称微线程,纤程。英文名Coroutine。一句说明什么是线程:协程是一种用户态的轻量级线程

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程

1、Greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

 1 from greenlet import greenlet
 2 
 3 def test1():
 4     print(12)
 5     gr2.switch()
 6     print(34)
 7     gr2.switch()
 8 
 9 def test2():
10     print(56)
11     gr1.switch()
12     print(78)
13 
14 gr1 = greenlet(test1)
15 gr2 = greenlet(test2)
16 gr1.switch()

2、Gevent

Gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度

 1 import gevent
 2 
 3 def foo():
 4     print(foo)
 5     gevent.sleep(1)
 6     print(foo again)
 7 
 8 def bar():
 9     print(bar)
10     gevent.sleep(2)
11     print(bar again)
12 
13 
14 gevent.joinall([
15     gevent.spawn(foo),
16     gevent.spawn(bar),
17 ])

遇到IO阻塞时会自动切换任务

from urllib.request import urlopen
import gevent ,time
from gevent import monkey
monkey.patch_all() # 把当前程序的所有IO操作单独做上标记

def f(url):
    print(GET: %s %url)
    resp = urlopen(url)
    data = resp.read()
    print(%d字节 …… %s%(len(data),url))

urls = [https://www.python.org/,
        https://www.github.com/,
        http://www.163.com,
        ]
time_start = time.time()
for url in urls:
    f(url)
print(同步cost,time.time() - time_start)

async_time_start = time.time()
gevent.joinall([
    gevent.spawn(f,https://www.python.org/),
    gevent.spawn(f,https://www.github.com/),
    gevent.spawn(f,http://www.163.com/),
])
print(异步cost,time.time() - async_time_start)

通过gevent实现单线程下的多socket并发

技术分享图片
 1 # 服务器端
 2 import sys
 3 import socket
 4 import time
 5 import gevent
 6 from gevent import socket,monkey
 7 monkey.patch_all()
 8 
 9 def server(port):
10     s = socket.socket()
11     s.bind((0.0.0.0,port))
12     s.listen(500)
13     while True:
14         conn ,addr = s.accept()
15         gevent.spawn(handle_request,conn)
16 
17 def handle_request(conn):
18     try:
19         while True:
20             data = conn.recv(1024)
21             print(recv:,data.decode())
22             conn.send(data)
23             if not data:
24                 conn.shutdown(socket.SHUT_WR)
25     except Exception as ex:
26         print(ex)
27     finally:
28         conn.close()
29 
30 if __name__ == __main__:
31     server(8001)
Gevent_server
技术分享图片
 1 import socket
 2 
 3 HOST = localhost
 4 PORT = 8001
 5 s = socket.socket()
 6 s.connect((HOST,PORT))
 7 while True:
 8     msg = input(>>:)
 9     s.send(msg.encode(utf-8))
10     data = s.recv(1024)
11     print(recv:,data.decode())
12 s.close()
Gevent_client
技术分享图片
 1 import socket
 2 import threading
 3 
 4 def sock_conn():
 5     client = socket.socket()
 6     client.connect((localhost,8001))
 7     count = 0
 8     while True:
 9         client.send((hello %s%count).encode(utf-8))
10         data = client.recv(1024)
11         print([%s] recv from server:%threading.get_ident(),data.decode())
12         count += 1
13     client.close()
14 
15 for i in range(100):
16     t = threading.Thread(target=sock_conn)
17     t.start()
并发100个sock连接

二、事件驱动与异步IO

通常,我们写服务器处理模型的程序时,有以下几种模型:

(1)每收到一个请求,创建一个新的进程,来处理该请求;

(2)每收到一个请求,创建一个新的线程,来处理该请求;

(3)每收到一个请求,放入一个件列表,让主进程通过非阻塞I/O方式来处理请求

上面的几种方式,各有千秋,

第(1)种方式,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单

第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题

第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。

综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。简而言之,事件驱动分为二个部分:第一,注册事件;第二,触发事件。

另外两种常见的编程范式是(单线程)同步以及多线程编程。

Select\\Poll\\Epoll异步IO

番外篇 http://www.cnblogs.com/alex3714/articles/5876749.html

Select 
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销

Poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)

Epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知

Python select

Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器

技术分享图片
 1 import select
 2 import socket
 3 import queue
 4 
 5 server = socket.socket()
 6 server.bind((localhost,8001))
 7 server.listen(1000)
 8 
 9 server.setblocking(False) # 不阻塞
10 
11 msg_dic = {}
12 inputs = [server,]
13 outputs = []
14 while True:
15     readable, writeable, exceptional = select.select(inputs, outputs, inputs)
16     for r in readable:
17         if r is server: # 代表来了一个新连接
18             conn,addr = r.accept()
19             print(来了个新连接,addr)
20             inputs.append(conn)
21             msg_dic[conn] = queue.Queue() # 初始化一个队列,后面存要返回给这个客户端的数据
22         else:
23             try:
24                 data = r.recv(1024)
25                 print(收到数据,data)
26                 msg_dic[r].put(data)
27                 outputs.append(r) # 放入返回的连接队列里
28             except:break
29 
30     for w in writeable: # 要返回给客户端的连接列表
31         data_to_client = msg_dic[w].get()
32         w.send(data_to_client) # 返回给客户端数据
33         outputs.remove(w) # 确保下次循环的时候,不返回这个已经处理完的连接
34 
35     for e in exceptional:
36         if e in outputs:
37             outputs.remove(e)
38         inputs.remove(e)
39         del msg_dic[e]
select_socket_server
技术分享图片
 1 import socket
 2 import sys
 3 
 4 messages = [bThis is the message. ,
 5             bIt will be sent ,
 6             bin parts.,
 7             ]
 8 server_address = (127.0.0.1, 8001)
 9 
10 socks = [socket.socket() for i in range(100)]
11 
12 print (sys.stderr, connecting to %s port %s % server_address)
13 for s in socks:
14     s.connect(server_address)
15 
16 for message in messages:
17 
18     for s in socks:
19         # print >> sys.stderr, ‘%s: sending "%s"‘ % (s.getsockname(), message)
20         s.send(message)
21 
22     for s in socks:
23         data = s.recv(1024)
24         # print >> sys.stderr, ‘%s: received "%s"‘ % (s.getsockname(), data)
25         if not data:
26             print (sys.stderr, closing socket, s.getsockname())
27             s.close()
select_socket_client

selectors模块

该模块基于select模块原语构建的高级别和高效的/输出多路复用。推荐用户使用这个模块,除非他们希望对os级别原语进行精确控制

 1 import selectors
 2 import socket
 3 
 4 sel = selectors.DefaultSelector()
 5 
 6 def accept(sock, mask):
 7     conn, addr = sock.accept()
 8     print(addr)
 9     conn.setblocking(False)
10     sel.register(conn, selectors.EVENT_READ, read)
11 
12 def read(conn, mask):
13     data = conn.recv(1024)
14     if data:
15         print(recv:,data)
16         conn.send(data)
17     else:
18         print(closing,conn)
19         sel.unregister(conn)
20         conn.close()
21 
22 sock = socket.socket()
23 sock.bind((localhost,8001))
24 sock.listen(100)
25 sock.setblocking(False)
26 sel.register(sock, selectors.EVENT_READ ,accept)
27 
28 while True:
29     events = sel.select()
30     for key, mask in events:
31         callback = key.data
32         callback(key.fileobj, mask)

 






以上是关于Python基础15 - 协程异步IO的主要内容,如果未能解决你的问题,请参考以下文章

Python黑魔法 --- 异步IO( asyncio) 协程

高性能异步爬虫

python---爬虫相关性能(各个异步模块的使用,和自定义异步IO模块)

Python协程异步IO

python协程和异步IO——IO多路复用

Python异步IO之协程:从yield from到async的使用