IO多路复用/基于IO多路复用+socket实现并发请求/协程
Posted rixian
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了IO多路复用/基于IO多路复用+socket实现并发请求/协程相关的知识,希望对你有一定的参考价值。
http://www.cnblogs.com/alex3714/articles/5876749.html
http://www.cnblogs.com/Eva-J/articles/8324837.html
http://www.cnblogs.com/linhaifeng/articles/6817679.html
一.IO多路复用
1.IO多路复用作用:检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(可读/可写)
2.select,poll,epoll都是I/O多路复用的具体的实现,之所以有这三个鬼东西的存在,其实是他们出现是有先后顺序的
(1).I/O多路复用这个概念被提出来以后,select是第一个实现的
select会修改传入的参数数组,只能监听1024个链接
select不是线程安全的
(2).14年以后,一帮人又实现了poll,poll修复了select的很多问题
poll去掉了1024个链接的限制,于是要多少链接呐,你开心就好
poll从设计上来说,不再修改传入的数据,不过这个要看你的平台,所以行走江湖,小心为妙
(3).2002年,大设备部Davide Libenzi实现了epoll
epoll可以说是I/O多路复用最新的实现,epoll修复了poll和select绝大部分问题
epoll现在是线程安全的
epoll现在不仅告诉你sock组里有数据,还会告诉你哪个sock组有数据
比较坑爹的是epoll只用Linux提供支持,默认集成到了Linux内核中
Windows Python:
提供:select
Mac Python:
提供:select
Linux Python:
提供:select,poll,epoll
对于select操作:
句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间) 参数: 可接受四个参数(前三个必须) 返回值:三个列表 select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。 1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中 2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中 3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中 4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化 当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
2.基于I/O多路复用+socket实现并发请求(一个线程100个请求)
应用功能:I/O多路复用;socket非阻塞
(1).以前所使用的socket发送请求:
import socket import requests # 方式一 ret = requests.get(‘https://www.baidu.com/s?wd=alex‘) # 方式二 client = socket.socket() # 百度创建连接: 阻塞 client.connect((‘www.baidu.com‘,80)) # 问百度我要什么? client.sendall(b‘GET /s?wd=alex HTTP/1.0 host:www.baidu.com ‘) # http协议 # 我等着接收百度给我的回复 chunk_list = [] while True: chunk = client.recv(8096) if not chunk: break chunk_list.append(chunk) body = b‘‘.join(chunk_list) print(body.decode(‘utf-8‘))
import socket import requests ##### 解决并发:单线程 ##### # 方式一 key_list = [‘alex‘,‘db‘,‘sb‘] for item in key_list: ret = requests.get(‘https://www.baidu.com/s?wd=%s‘ %item) # 方式二 def get_data(key): # 方式二 client = socket.socket() # 百度创建连接: 阻塞 client.connect((‘www.baidu.com‘,80)) # 问百度我要什么? client.sendall(b‘GET /s?wd=alex HTTP/1.0 host:www.baidu.com ‘) # 我等着接收百度给我的回复 chunk_list = [] while True: chunk = client.recv(8096) if not chunk: break chunk_list.append(chunk) body = b‘‘.join(chunk_list) print(body.decode(‘utf-8‘)) key_list = [‘alex‘,‘db‘,‘sb‘] for item in key_list: get_data(item) ##### 解决并发:多线程 ##### import threading key_list = [‘alex‘,‘db‘,‘sb‘] for item in key_list: t = threading.Thread(target=get_data,args=(item,)) t.start() # #################### 解决并发:单线程+IO不等待 #################### # IO请求? # 数据回来了?
(2).使用I/O多路复用+socket实现
import socket client = socket.socket() client.setblocking(False) # 将原来阻塞的位置变成非阻塞(报错) # 百度创建连接: 阻塞 try: client.connect((‘www.baidu.com‘,80)) # 执行了但报错了 except BlockingIOError as e: pass # 检测到已经连接成功 # 问百度我要什么? client.sendall(b‘GET /s?wd=alex HTTP/1.0 host:www.baidu.com ‘) # 我等着接收百度给我的回复 chunk_list = [] while True: chunk = client.recv(8096) # 将原来阻塞的位置变成非阻塞(报错) if not chunk: break chunk_list.append(chunk) body = b‘‘.join(chunk_list) print(body.decode(‘utf-8‘))
import socket import select client1 = socket.socket() client1.setblocking(False) # 百度创建连接: 非阻塞 try: client1.connect((‘www.baidu.com‘,80)) except BlockingIOError as e: pass client2 = socket.socket() client2.setblocking(False) # 搜狗创建连接: 非阻塞 try: client2.connect((‘www.sogou.com‘,80)) except BlockingIOError as e: pass client3 = socket.socket() client3.setblocking(False) # 淘宝创建连接: 非阻塞 try: client3.connect((‘www.taobao.com‘,80)) except BlockingIOError as e: pass socket_list = [client1,client2,client3] conn_list = [client1,client2,client3] while True: rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # wlist中表示已经连接成功的socket对象 for sk in wlist: if sk == client1: sk.sendall(b‘GET /s?wd=alex HTTP/1.0 host:www.baidu.com ‘) elif sk==client2: sk.sendall(b‘GET /web?query=fdf HTTP/1.0 host:www.sogou.com ‘) else: sk.sendall(b‘GET /s?wd=alex HTTP/1.0 host:www.taobao.com ‘) conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b‘‘.join(chunk_list) # print(body.decode(‘utf-8‘)) print(‘------------>‘,body) sk.close() socket_list.remove(sk) if not socket_list: break
import socket import select class Req(object): def __init__(self,sk,func): self.sock = sk self.func = func def fileno(self): return self.sock.fileno() class Nb(object): def __init__(self): self.conn_list = [] self.socket_list = [] def add(self,url,func): client = socket.socket() client.setblocking(False) # 非阻塞 try: client.connect((url, 80)) except BlockingIOError as e: pass obj = Req(client,func) self.conn_list.append(obj) self.socket_list.append(obj) def run(self): while True: rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005) # wlist中表示已经连接成功的req对象 for sk in wlist: # 发生变换的req对象 sk.sock.sendall(b‘GET /s?wd=alex HTTP/1.0 host:www.baidu.com ‘) self.conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.sock.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b‘‘.join(chunk_list) # print(body.decode(‘utf-8‘)) sk.func(body) sk.sock.close() self.socket_list.remove(sk) if not self.socket_list: break def baidu_repsonse(body): print(‘百度下载结果:‘,body) def sogou_repsonse(body): print(‘搜狗下载结果:‘, body) def cnblogs_repsonse(body): print(‘博客园下载结果:‘, body) t1 = Nb() t1.add(‘www.baidu.com‘,baidu_repsonse) t1.add(‘www.sogou.com‘,sogou_repsonse) t1.add(‘www.cnblogs.com‘,cnblogs_repsonse) t1.run()
(3).基于事件循环实现的异步非阻塞框架:yfz
11111
以上是关于IO多路复用/基于IO多路复用+socket实现并发请求/协程的主要内容,如果未能解决你的问题,请参考以下文章