Python之并发并行阻塞非租塞同步异步IO多路复用

Posted alexephor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python之并发并行阻塞非租塞同步异步IO多路复用相关的知识,希望对你有一定的参考价值。

一、并发并行

并发:表示执行多个任务的能力

并行:表示同一时刻执行多个任务

二、模拟socket发送http请求

三大步骤:创建连接 要发送的东西 然后数据回来接收    socket默认情况下阻塞

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import socket
 5 
 6 client = socket.socket()
 7 client.setblocking(False)  # 这里设置非阻塞  
 8 # 百度创建连接:阻塞
 9 try:
10     client.connect((www.baidu.com, 80))  # 执行了但报错
11 except BlockingIOError as e:
12     pass
13 
14 # 发送要东西了
15 client.sendall(bGET /s?wd=hello HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n)
16 
17 # 百度给我的回复
18 chunk_list = []
19 while True:
20     try:
21         # 阻塞
22         chunk = client.recv(8096)
23         if not chunk:
24             break
25         chunk_list.append(chunk)
26     except BlockingIOError:
27         pass
28 
29 print(b‘‘.join(chunk_list).decode(utf8))

 三、基于单线程和IO多路复用发送多个任务(并发方法一非阻塞)

IO多路复用作用:只是检测socket的变化 是否连接成功 是否返回数据 是否可读可写

rlist, wlist, elist = select.select(socket_list, conn_list, [], 0.005)  

conn_list   检测其中所有socket对象是否与服务器连接成功   可写
socket_list 检测服务器是否有数据给我返回来 可读
[]    检测是否有错误
    连接最大超出的时间为0.005秒
rlist 返回数据的放在rlist中
wlist 把连接成功的放在wlist列表中 

实现的方式有两种   

select.select(socket_list, conn_list, [], 0.005)
select监听的socket_list, conn_list内部会调用列表中的每一个值得fileno方法,获取该返回值并去系统中检测
方式一:
select.select([client1, client2], [client1, client2], [], 0.005)
方式二:高级版本 封装socket对象
select.select([Foo(client1), Foo(client2)], [Foo(client1), Foo(client2)], [], 0.005)

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import socket
 5 import select
 6 
 7 client1 = socket.socket()
 8 client1.setblocking(False)
 9 try:
10     client1.connect(("www.baidu.com", 80))
11 except BlockingIOError as e:
12     pass
13 
14 client2 = socket.socket()
15 client2.setblocking(False)
16 try:
17     client2.connect(("www.so.com", 80))
18 except BlockingIOError as e:
19     pass
20 socket_list = [client1, client2]
21 conn_list = [client1, client2]
22 while True:
23     rlist, wlist, elist = select.select(socket_list, conn_list, [], 0.005)
24     for sk in wlist:
25         if sk == client1:
26             sk.sendall(bGET /s?wd=hello HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n)
27         else:
28             sk.sendall(bGET /s?q=hello HTTP/1.0\r\nhost:www.so.com\r\n\r\n)
29         conn_list.remove(sk)
30 
31     for sk in rlist:
32         chunk_list = []
33         while True:
34             try:
35                 chunk = sk.recv(8096)
36                 if not chunk:
37                     break
38                 chunk_list.append(chunk)
39             except BlockingIOError as e:
40                 break
41         body = b"".join(chunk_list)
42         print("----->", body.decode(utf8))
43         # print("----->", body)
44         sk.close()
45         socket_list.remove(sk)
46     if not socket_list:
47         break
48 
49 """
50 回调:异步本质(通知)
51 def callback(arg):
52     print(arg)
53 func("www.baidu.com/s?wd=hello" callback)
54 print(123)
55 print(123)
56 print(123)
57 """

 

 四、利用单线程实现高并发NB高级版本(高并发方法二异步非阻塞)

 

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import socket
 5 import select
 6 
 7 
 8 class Req(object):
 9     def __init__(self, sk, func):
10         self.sock = sk
11         self.func = func
12 
13     def fileno(self):
14         return self.sock.fileno()
15 
16 
17 class Nb(object):
18 
19     def __init__(self):
20         self.conn_list = []
21         self.socket_list = []
22 
23     def add(self, url, func):
24         client = socket.socket()
25         client.setblocking(False)  
26         try:
27             client.connect((url, 80))
28         except BlockingIOError as e:
29             pass
30         obj = Req(client, func)
31         self.conn_list.append(obj)
32         self.socket_list.append(obj)
33 
34     def run(self):
35         while True:
36             rlist, wlist, elist = select.select(self.socket_list, self.conn_list, [], 0.005)
37             for sk in wlist:
38                 sk.sock.sendall(bGET /s?wd=hello HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n)
39                 self.conn_list.remove(sk)
40 
41             for sk in rlist:
42                 chunk_list = []
43                 while True:
44                     try:
45                         chunk = sk.sock.recv(8096)
46                         if not chunk:
47                             break
48                         chunk_list.append(chunk)
49                     except BlockingIOError as e:
50                         break
51                 body = b"".join(chunk_list)
52                 sk.func(body)
53                 sk.sock.close()
54                 self.socket_list.remove(sk)
55             if not self.socket_list:
56                 break
57 
58 
59 def baidu_response(body):
60     print(百度下载的结果, body)
61 
62 
63 def sogou_response(body):
64     print(搜狗下载的结果, body)
65 
66 
67 t1 = Nb()
68 t1.add(www.baidu.com, baidu_response)
69 t1.add(www.so.com, sogou_response)
70 t1.run()

 

 总结:

IO多路复用的作用: 检测多个socket是否发生变化
操作系统检测socket是否发生变化 有三种模式
select:最多1024个socket个数 循环去检测。
poll: 不限制监听socket个数,循环去检测。(水平触发)
epoll: 不限制监听socket个数,回调的方式(边缘触发) windows下没有 触发的机制可以采用简单点儿的高低电平来理解
    linux下
select.epoll 有点复杂注册哪些等等操作 用法差不多select.select
            提高并发方案:多进程 多线程 异步非阻塞模块(Twisted)
什么是异步非阻塞?
   非阻塞, 不等待 比如:创建socket对某个地址进行连接,获取接收数据时就会等待,连接成功/接收到数据时,
才执行后续操作,如果设置setblocking(False),上边两过程不再等待,但是要报错BlockingError,只要去捕获即可
异步:(通知)执行完成之后自动执行回调函数或者自动执行某些操作(通知)最根本就是执行完某个任务自动调用我给他的函数
比如:爬虫中向某个地址发送请求,当请求执行完成之后自动执行回调函数

什么是同步阻塞?
同步:按照顺序逐步执行
阻塞:等待

    这么多代码其实可以不用写 基于异步非阻塞的框架 基于事件循环(驱动) Twisted

 

 

以上是关于Python之并发并行阻塞非租塞同步异步IO多路复用的主要内容,如果未能解决你的问题,请参考以下文章

进程之并行并发同步异步与阻塞非阻塞

并行,并发,串行,同步,异步,阻塞,非阻塞,同步阻塞,同步非阻塞,异步阻塞,异步非阻塞

Python学习:并发编程之IO模型

并发编程之IO模型

并发并行串行同步异步阻塞非阻塞

Python并发编程之同步异步and阻塞非阻塞