Python之并发并行阻塞非租塞同步异步IO多路复用
Posted alexephor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python之并发并行阻塞非租塞同步异步IO多路复用相关的知识,希望对你有一定的参考价值。
一、并发并行
并发:表示执行多个任务的能力
并行:表示同一时刻执行多个任务
二、模拟socket发送http请求
三大步骤:创建连接 要发送的东西 然后数据回来接收 socket默认情况下阻塞
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import socket 5 6 client = socket.socket() 7 client.setblocking(False) # 这里设置非阻塞 8 # 百度创建连接:阻塞 9 try: 10 client.connect((‘www.baidu.com‘, 80)) # 执行了但报错 11 except BlockingIOError as e: 12 pass 13 14 # 发送要东西了 15 client.sendall(b‘GET /s?wd=hello HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘) 16 17 # 百度给我的回复 18 chunk_list = [] 19 while True: 20 try: 21 # 阻塞 22 chunk = client.recv(8096) 23 if not chunk: 24 break 25 chunk_list.append(chunk) 26 except BlockingIOError: 27 pass 28 29 print(b‘‘.join(chunk_list).decode(‘utf8‘))
三、基于单线程和IO多路复用发送多个任务(并发方法一非阻塞)
IO多路复用作用:只是检测socket的变化 是否连接成功 是否返回数据 是否可读可写
rlist, wlist, elist = select.select(socket_list, conn_list, [], 0.005)
conn_list 检测其中所有socket对象是否与服务器连接成功 可写
socket_list 检测服务器是否有数据给我返回来 可读
[] 检测是否有错误
连接最大超出的时间为0.005秒
rlist 返回数据的放在rlist中
wlist 把连接成功的放在wlist列表中
实现的方式有两种
select.select(socket_list, conn_list, [], 0.005)
select监听的socket_list, conn_list内部会调用列表中的每一个值得fileno方法,获取该返回值并去系统中检测
方式一:
select.select([client1, client2], [client1, client2], [], 0.005)
方式二:高级版本 封装socket对象
select.select([Foo(client1), Foo(client2)], [Foo(client1), Foo(client2)], [], 0.005)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import socket 5 import select 6 7 client1 = socket.socket() 8 client1.setblocking(False) 9 try: 10 client1.connect(("www.baidu.com", 80)) 11 except BlockingIOError as e: 12 pass 13 14 client2 = socket.socket() 15 client2.setblocking(False) 16 try: 17 client2.connect(("www.so.com", 80)) 18 except BlockingIOError as e: 19 pass 20 socket_list = [client1, client2] 21 conn_list = [client1, client2] 22 while True: 23 rlist, wlist, elist = select.select(socket_list, conn_list, [], 0.005) 24 for sk in wlist: 25 if sk == client1: 26 sk.sendall(b‘GET /s?wd=hello HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘) 27 else: 28 sk.sendall(b‘GET /s?q=hello HTTP/1.0\r\nhost:www.so.com\r\n\r\n‘) 29 conn_list.remove(sk) 30 31 for sk in rlist: 32 chunk_list = [] 33 while True: 34 try: 35 chunk = sk.recv(8096) 36 if not chunk: 37 break 38 chunk_list.append(chunk) 39 except BlockingIOError as e: 40 break 41 body = b"".join(chunk_list) 42 print("----->", body.decode(‘utf8‘)) 43 # print("----->", body) 44 sk.close() 45 socket_list.remove(sk) 46 if not socket_list: 47 break 48 49 """ 50 回调:异步本质(通知) 51 def callback(arg): 52 print(arg) 53 func("www.baidu.com/s?wd=hello" callback) 54 print(123) 55 print(123) 56 print(123) 57 """
四、利用单线程实现高并发NB高级版本(高并发方法二异步非阻塞)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import socket 5 import select 6 7 8 class Req(object): 9 def __init__(self, sk, func): 10 self.sock = sk 11 self.func = func 12 13 def fileno(self): 14 return self.sock.fileno() 15 16 17 class Nb(object): 18 19 def __init__(self): 20 self.conn_list = [] 21 self.socket_list = [] 22 23 def add(self, url, func): 24 client = socket.socket() 25 client.setblocking(False) 26 try: 27 client.connect((url, 80)) 28 except BlockingIOError as e: 29 pass 30 obj = Req(client, func) 31 self.conn_list.append(obj) 32 self.socket_list.append(obj) 33 34 def run(self): 35 while True: 36 rlist, wlist, elist = select.select(self.socket_list, self.conn_list, [], 0.005) 37 for sk in wlist: 38 sk.sock.sendall(b‘GET /s?wd=hello HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘) 39 self.conn_list.remove(sk) 40 41 for sk in rlist: 42 chunk_list = [] 43 while True: 44 try: 45 chunk = sk.sock.recv(8096) 46 if not chunk: 47 break 48 chunk_list.append(chunk) 49 except BlockingIOError as e: 50 break 51 body = b"".join(chunk_list) 52 sk.func(body) 53 sk.sock.close() 54 self.socket_list.remove(sk) 55 if not self.socket_list: 56 break 57 58 59 def baidu_response(body): 60 print(‘百度下载的结果‘, body) 61 62 63 def sogou_response(body): 64 print(‘搜狗下载的结果‘, body) 65 66 67 t1 = Nb() 68 t1.add(‘www.baidu.com‘, baidu_response) 69 t1.add(‘www.so.com‘, sogou_response) 70 t1.run()
总结:
IO多路复用的作用: 检测多个socket是否发生变化
操作系统检测socket是否发生变化 有三种模式
select:最多1024个socket个数 循环去检测。
poll: 不限制监听socket个数,循环去检测。(水平触发)
epoll: 不限制监听socket个数,回调的方式(边缘触发) windows下没有 触发的机制可以采用简单点儿的高低电平来理解
linux下select.epoll 有点复杂注册哪些等等操作 用法差不多select.select
提高并发方案:多进程 多线程 异步非阻塞模块(Twisted)
什么是异步非阻塞?
非阻塞, 不等待 比如:创建socket对某个地址进行连接,获取接收数据时就会等待,连接成功/接收到数据时,
才执行后续操作,如果设置setblocking(False),上边两过程不再等待,但是要报错BlockingError,只要去捕获即可
异步:(通知)执行完成之后自动执行回调函数或者自动执行某些操作(通知)最根本就是执行完某个任务自动调用我给他的函数
比如:爬虫中向某个地址发送请求,当请求执行完成之后自动执行回调函数
什么是同步阻塞?
同步:按照顺序逐步执行
阻塞:等待
这么多代码其实可以不用写 基于异步非阻塞的框架 基于事件循环(驱动) Twisted
以上是关于Python之并发并行阻塞非租塞同步异步IO多路复用的主要内容,如果未能解决你的问题,请参考以下文章