11.python并发入门(part15 关于I/O多路复用)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了11.python并发入门(part15 关于I/O多路复用)相关的知识,希望对你有一定的参考价值。

一、为什么要产生I/O多路复用?

两个主机之间通信,主机A和主机B都需要开启socket,主机A首先要等待客户端来进行连接,这是会发起一个recvfrom的系统调用,如果主机B一直没有去连接主机A,没有给主机A发送任何数据,进程就会被阻塞,无法去做其他的事情(默认的阻塞I/O模型),一直阻塞到主机B去连接主机A,主机A收到了这个连接。

其实这种模式在单服务器,单客户端(两台主机之间单独通信)没有什么问题,如果说是多并发场景呢?服务端阻塞与第一个客户端发来的socket对象中,另外一个客户端2要发送数据给服务端,其实服务端还阻塞于和第一个客户端建立的连接,还在等待第一个客户端发来数据,无法处理其他客户端的数据,此时问题就出现了!!

客户端1的数据没有处理完,就无法处理客户端2发来的连接,有人会想到多线程,有人会想到多进程,当并发比较大的情况下,使用I/O多路复用也是不错的选择。


使用了I/O多路复用后,会出现怎样的效果?

还用刚才的例子,假如有多个客户端连接,同时监听n个客户端的socket,当其中有一个客户端发来消息,socket的状态就会产生变化,select/epoll就会检测到这个变化,select/epoll会将发生变化的I/O流(socket)返回,然后拿到select/epoll返回的内容,返回的内容便是发生变化的socket,我们只需要从这些发生变化的I/O流去接收数据就可以了。

接收完消息后,继续回到select/epoll开始重新阻塞。

这样避免了一直阻塞在一个客户端,而不能处理其他客户端的连接。


还有两点需要特别注意!!

I/O多路复用存在的意义,就是解决对多个I/O进行监听时,期中一个I/O阻塞影响到其他I/O的问题,这个概念跟多线程并没有什么关系~~~


和多线程去比较的话,多线程之间进行切换,也是有很大的资源消耗的,虽然没有多进程那么大,I/O多路复用和多线程比的优势,就是I/O多路复用无需切换线程和进程,在针对多并发场景,效率更高!

拿nginx来举例,nginx使用的就是I/O多路复用,性能极佳。

但是多线程,多进程在代码的处理逻辑上,要比I/O多路复用简单的多。







二、简单了解I/O多路复用。

在没有使用I/O多路复用之前,每进来一个I/O流(socket),会分配一个新的线程或者进程去管理。

使用了I/O多路复用后,单个进程, 通过记录每个I/O流(socket)的状态,来同时对多个I/O流进行管理。

I/O多路复用是通过单进程,单线程来实现的,本质是通过单个线程,记录,跟踪每一个I/O流(socket)的状态,来同时管理这些I/O流。



I/O多路复用的好处就在于单个进程,可以同时处理多个网络连接的I/O,其实就是又select/epoll这两个函数去不断的轮询所有的socket,一旦有socket的状态发生变化,就会去通知用户进程(程序)。

I/O多路复用的基本运行流程图如下:

技术分享

首先由select函数发起一个系统调用,这个系统调用也叫select,一但调用了这个select,整个进程就会进入阻塞状态,此时,内核回去检测,所有select负责的socket,当select负责的这些socket中,其中只要有一个socket的状态发生了改变,那么select就会立刻返回。用户就可以收到来自这个socket发来的数据。

其实select这个图和阻塞I/O很像,貌似没什么区别,但是,使用select是由优势的,select可以同时处理多个连接。

如果处理的并发连接数不是很高的情况下,使用这种IO多路复用,可能还没有多线程+IO阻塞的性能好,甚至可能会加大延迟。

注意!!IO多路复用select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

在I/O多路复用的模式中,每一个socket,一般都设置为非阻塞,但是整个用户的进程其实是一直被阻塞的!和默认的I/O阻塞不同,这个阻塞是因为进程被select函Lock导致的,和I/O造成的阻塞其实不是一回事。

一旦select函数返回的结果中有内容可以拿了,那么进程就可以调用accpet或者recv将位于内核空间的数据copy到用户空间中。

只有在处理特别多的连接数时,才需要考虑使用select函数,如果只处理单个连接,无法体现出select的优势!

下面是select函数基本用法的一个示例:

服务端:

#!/usr/local/bin/python2.7

# -*- coding:utf-8 -*-

import socket

import select

socket_obj = socket.socket()

socket_obj.bind(("127.0.0.1",8889))

socket_obj.listen(5)

while True:

    r,w,e = select.select([socket_obj,],[],[],5) #用来检测socket_obj这个socket对象是否产生了变化,如果有变化,返回给变量r,(5代表5秒后,即使socket没有任何变化,代码也会向下执行!)

#一旦这个socket产生了变化之后,变量r就有内容了。

    for i in r:

        print "conn ok!"

    print (‘wait.....‘)

客户端:

#!/usr/local/bin/python2.7

# -*- coding:utf-8 -*-

import socket

socket_client = socket.socket()

socket_client.connect(("127.0.0.1",8889))

while True:

    inpt = raw_input(">>>").strip()

    socket_client.send(inpt.encode("utf-8"))

    data = socket_client.recv(1024)

    print data.decode("utf8")




三、select,poll,epoll 三种I/O多路复用方式的介绍。

1.关于select。

1.1 select简介:

select函数通过一个名为select的系统调用,去监控具有文件描述符的列表,一旦其中的某个文件描述符发生了变化,select函数就会返回这个文件描述符,使程序获得这个文件描述符,然后做一些后续操作。

下面介绍下select这种I/O多路复用的缺点。

单个进程中,可以监视的最大文件描述符,只有1024个(这只是针对linxu操作系统),不过据说是可以修改。

还有就是,select监听的文件描述符如果很多的话,开销也会变得非常大,这是跟select的内部触发机制是由直接关系的,因为select使用的是水平触发。(关于水平触发和边缘触发的概念,本文的后面会进行介绍),如果暂时不知道什么是水平触发,没关系,现在只需要知道,select内部维护了一个死循环,这个死循环会轮询检测每个文件描述符的状态是否发生变化(这种检测的方式是一个一个的去检测,也叫线性扫描)

1.2当我们调用了select()函数时,都做了哪些操作?

首先上下文切换,从用户态切换为内核态。

然后将I/O流(socket)(连接对象)从用户控件复制到内核空间。

由内核去轮询(水平轮询),所有的I/O流是否有发生变化。

如果有变化,返回发生变化的I/O流。

将I/O流从内核空间复制到用户空间。


1.3关于select模块中select函数的使用方法介绍。

s_r_list, s_w_list, s_e_list = select.select(rlist, wlist, xlist, [timeout])

selcet 一共有4个参数,其中有三个是必须填的。

下面是这四个参数的官方解释。

rlist:(wait until ready for reading)等待准备读取的内容。(个人理解就是select的监听队列)

wlist:(wait until ready for writing)等待准备写入的内容。

xlist:(wait for an exceptional condition)等待一个特殊条件

timeout:超时时间(如果超出了超时时间,即使I/O流没有任何变化,代码也会向下执行!不会阻塞在这里!)当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化。

select函数中的第一个返回值是最常用的!!

s_r_list:当select中的rlist中监听的I/O流如果发生了变化,就会把发生变化的I/O流,返回到这个列表中。


1.4使用select这种I/O多路复用的模式,监听多连接,实现并发的例子。

服务端ver1:

#!/usr/local/bin/python2.7

# -*- coding:utf-8 -*-

import select

import socket

socket_server_obj = socket.socket()

socket_server_obj.bind((‘127.0.0.1‘,8989))

socket_server_obj.listen(100)

input_list = [socket_server_obj,]

message_dic = {}#使用字典来代替一个消息队列。

out_list = []

while True:

    s_r_list,s_w_list,s_e_list = select.select(input_list,out_list,[])

    print "正在监听的socket对象有 %d 个" %(len(s_r_list))

    for r_socket in s_r_list:

        if r_socket == socket_server_obj:  #判断发生改变的I/O流是自己的socket还是客户端的socket(客户端连接)

            conn,address = r_socket.accept()  #如果发现自己的socket发生了改变,只能说明一种情况,就是有新的客户端连接进来了。

            input_list.append(conn) #将客户端的socket(也就是客户端的连接对象)加入到监听select的监听列表中。

            message_dic[conn] = [] #使用字典来代替消息队列,为每一个客户端连接对象提供一个消息列表。

        else: #if判断一个r_socket无非只有两种情况,第一种就是服务端自己的socket,第二种就是客户端的socket(客户端连接),r_socket是客户端连接,那么就会执行这个else分支。

            try:

                data = r_socket.recv(1024)#等待接收客户端发来的消息

            except Exception as error:

                input_list.remove(r_socket) 

            else: #当try代码块中的代码执行没有任何异常后,执行else代码块,在这个程序中,执行了else代码块,就说明服务端成功收到了客户端发来的数据。

                message_dic[r_socket].append(data) #把客户端发来的数据放进队列。

                out_list.append(r_socket) #将发送过数据的客户端连接对象(客户端socket)添加到out_list列表中以便select区分。

    for conn in s_w_list: #out_list这个列表传进select函数后,会返回给第二个返回值,这个操作就是在循环select函数的第二个返回值的列表,这样就是为了区分哪些连接给服务端发送过数据。

        recv_msg = message_dic[conn][0] #从字典模拟的消息队列中,取出客户端发来的数据。

        del message_dic[conn][0] #从队列中取出后删除

        conn.send(recv_msg+" ok!!")

        out_list.remove(conn)

    for err_socket in s_e_list: #将发生错误的socket文件描述符,移出监听列表。

        input_list.remove(err_socket)


服务端ver2:

#!/usr/local/bin/python2.7

# -*- coding:utf-8 -*-

import select

import socket

import Queue

socket_server_obj = socket.socket()

socket_server_obj.bind((‘127.0.0.1‘,8999))

socket_server_obj.listen(5)

socket_server_obj.setblocking(False)

#将socket设置为非阻塞模式.

#一旦设置成非阻塞,执行accept就会报错,必须有连接过来时,才可以accpet!

in_put = [socket_server_obj,]

msg_dic = {}

input_list = [socket_server_obj,]

#select监听的I/O流的列表.

output_list = [] #放入这里面的I/O流,下次select去检测的时候,会从select的第二个返回值当中拿出来.

while True:

    print "new-----"

    s_r_list,s_w_list,s_e_list = select.select(input_list,output_list,input_list)

    print s_r_list,s_w_list,s_e_list

    for socket_or_conn in s_r_list:

        if socket_or_conn == socket_server_obj:

            conn,addr = socket_or_conn.accept()

            print "new client %s port %s"  %(addr)

            input_list.append(conn) #将客户端socket假如到select监听列表中.

            print input_list

            #一旦客户端socket发生变化,那么就说明客户端有数据发来了.

            msg_dic[conn] = Queue.Queue() #创建一个队列.这个队列中存有要返回给每个客户端的数据.

            print msg_dic

        else:

            try:

                print "recv!!"

                data = socket_or_conn.recv(1024)

                print data

                #服务端收到了来自客户端的数据后,如果想给客户端发数据,这个时候需要注意!!千万不可以直接发!!

                #如果数据直接send给客户端,如果客户端没有接受的话,数据就没了!!

                #如果想给客户端回复数据,要将数据放到指定客户端的队列中.

                msg_dic[socket_or_conn].put(data)

                #在队列里面放入数据

                output_list.append(socket_or_conn)

                #要发送的数据放入队列后,将这个客户端的socket加入到返回连接的队列中.

            except Exception as e:

                print "client disconnect!!"

                if socket_server_obj in output_list:

                    output_list.remove(socket_server_obj)

                input_list.remove(socket_server_obj)

                del msg_dic[socket_server_obj]

    for w_sock in s_w_list: #这个列表中存有,有服务端需要去回数据的客户端连接.

        data_to_client = msg_dic[w_sock].get()

        w_sock.send(data_to_client)

        output_list.remove(w_sock) #成功给客户端回复了数据后,把客户端连接从返回队列中删除,确保下次循环的时候不返回这个已经处理完的连接了

    for e_sock in s_e_list:  

        if e_sock in output_list:

            output_list.remove(e_sock)

        input_list.remove(e_sock)

        del msg_dic[e_sock]


客户端:

#!/usr/local/bin/python2.7

# -*- coding:utf-8 -*-

import socket

client = socket.socket()

client.connect(("127.0.0.1", 8999))

while True:

    cmd = raw_input(‘>>> ‘).strip()

    if len(cmd) == 0 :

        continue

    client.send(cmd)

    data = client.recv(1024)

    print data

client.close()


2.关于poll

关于poll,没什么好介绍的,内部实现机制基本和select一致,都是线性扫描外加水平触发,与select不同的是,poll没有最大文件描述符的限制。


3.关于epoll

首先,epoll也没有最大文件描述符数量的限制,最重要的是epoll支持两种触发机制,“边缘触发”,“水平触发”。

(在这先简单的理解下边缘触发的概念,epoll中的边缘触发,就是指,只告诉用户进程,刚刚哪个文件描述符的状态发生改变,它只说一遍,如果我们没有采取行动,那么它将不会再次告知。这就是边缘触发。)


3.1 epoll与select的比较。

在说epoll和select的差别之前,首先需要有一个I/O流的概念,什么是I/O流?

一个文件,或者socket,pipe,只要能执行I/O操作的内核对象,都属于I/O流。

我们可以从流中写入数据,也可以从流中读取数据。

现在假定一个场景,我们要从一个流中,要读取数据,但是这个流中并没有数据(比如socket中的accpet,recv)这时,该如何处理?

此处就涉及到前面说过的I/O阻塞模型了。

阻塞:一直阻塞,直到有数据。(直到从I/O流中读取到了数据,才回去做别的事)


非阻塞:不停的循环,轮询,询问内核是否有将数据准备好。(这就好比,在等快递,每分钟给快递员打个电话,每分钟文快递员一次,快递到了没。)


其实从某种角度来说,第一种阻塞模式,是有他的优势的!它消耗cpu时间消耗的非常少,如果线程sleep了,暂时就不会去浪费cpu的时间分片了。


为了更清晰的了解,阻塞是如何进行的,接下来,来说说内核的缓冲区。

内核缓冲区的存在,是为了减少频繁的I/O操作引发的系统调用。

假如说有一个管道,进程A是管道一端的写入方,进程B是管道另一端的读取方,再假设一开始,内核的缓冲区是空的,B要读取数据,但是A还没有发送,这个时候,B是阻塞的,一直等待A进程的写入,此时,内核的缓冲区会从空变为非空状态,这时,内核会产生一个“缓冲区非空的事件”,告诉用户进程B,有数据进来了,进程B就会从阻塞状态中被唤醒。

那么假如这个事件通知,到达进程B,但是进程B还是没有读取数据,此时A写入的数据会滞留在内核缓冲区,此时,内核的缓冲区如果满了,B进程依旧没有读数据,最终内核缓冲区会被填满,这个时候会产生一个“内核缓冲区满”的I/O事件,这个事件会通知给进程A。(此时A会阻塞)

接着假设,此时进程B开始读取数据了,此时缓冲区有了空位,此时内核会发起一个“缓冲区非满”的事件通知,给进程A,此时进程A会被唤醒,继续开始写数据。


上面这四个情景,涵盖了四个不同的I/O事件,分别是“缓冲区满”,“缓冲区为空”,“缓冲区非满”,“缓冲区非空”,这就是I/O阻塞发生的根本。


前面说过了,阻塞I/O的效率不高,一个线程同时只能处理一个I/O流,如果想同时处理多个I/O流,需要使用多进程or多线程,不过不管是多进程还是多线程,效率其实都不高。


然后是非阻塞I/O,虽然同时可以处理多个I/O流,但是,但是需要不停的去“询问”所有的I/O流,如果I/O流中没有数据,那么只会白白浪费CPU的资源。


后来,出现了select,poll以及epoll,这三个机制很好的解决了非阻塞I/O需要不停的去“询问”所有I/O流的问题,它们可以同时监听多个I/O流,当处于空闲状态时,程序会阻塞,当出现了一个或者多个I/O事件时,程序就会从阻塞的状态醒过来。


把前面的概念简单回顾一下,接下来就可以将select和epoll的区别了。

epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。 

比如100个连接,有两个活跃了,epoll会告诉用户这两个两个活跃了,直接取就ok了,而select是循环一遍。

epoll可以理解为event poll,epoll之会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的。

epoll的解决方案,主要在epoll_ctl这个函数中,每次产生新的I/O事件到epoll句柄时,会把所有的文件描述符,拷贝到内核空间,epoll保证了每个文件描述符在整个过程中只会拷贝一次。



epoll的常用操作如下:

select.epoll(sizehint=-1, flags=0) 创建epoll对象。

epoll.close()

Close the control file descriptor of the epoll object.关闭epoll对象的文件描述符

epoll.closed

True if the epoll object is closed.检测epoll对象是否关闭

epoll.fileno()

Return the file descriptor number of the control fd.返回epoll对象的文件描述符

epoll.fromfd(fd)

Create an epoll object from a given file descriptor.根据指定的fd创建epoll对象

epoll.register(fd[, eventmask])

Register a fd descriptor with the epoll object.向epoll对象中注册fd和对应的事件

epoll.modify(fd, eventmask)

Modify a registered file descriptor.修改fd的事件

epoll.unregister(fd)

Remove a registered file descriptor from the epoll object.取消注册

epoll.poll(timeout=-1, maxevents=-1)

Wait for events. timeout in seconds (float)阻塞,直到注册的fd事件发生,会返回一个dict,格式为:{(fd1,event1),(fd2,event2),……(fdn,eventn)}


事件:

EPOLLIN    Available for read 可读   状态符为1

EPOLLOUT    Available for write 可写  状态符为4

EPOLLPRI    Urgent data for read

EPOLLERR    Error condition happened on the assoc. fd 发生错误 状态符为8

EPOLLHUP    Hang up happened on the assoc. fd 挂起状态

EPOLLET    Set Edge Trigger behavior, the default is Level Trigger behavior 默认为水平触发,设置该事件后则边缘触发

EPOLLONESHOT    Set one-shot behavior. After one event is pulled out, the fd is internally disabled

EPOLLRDNORM    Equivalent to EPOLLIN

EPOLLRDBAND    Priority data band can be read.

EPOLLWRNORM    Equivalent to EPOLLOUT

EPOLLWRBAND    Priority data may be written.

EPOLLMSG    Ignored.


#由于博主现在的系统环境不支持,无法亲自写示例,所以在网上找了一个。

服务端:

import socket

import select

s = socket.socket()

s.bind((‘127.0.0.1‘,8888))

s.listen(5)

epoll_obj = select.epoll()

epoll_obj.register(s,select.EPOLLIN)

connections = {}

while True:

    events = epoll_obj.poll()

    for fd, event in events:

        print(fd,event)

        if fd == s.fileno():

            conn, addr = s.accept()

            connections[conn.fileno()] = conn

            epoll_obj.register(conn,select.EPOLLIN)

            msg = conn.recv(200)

            conn.sendall(‘ok‘.encode())

        else:

            try:

                fd_obj = connections[fd]

                msg = fd_obj.recv(200)

                fd_obj.sendall(‘ok‘.encode())

            except BrokenPipeError:

                epoll_obj.unregister(fd)

                connections[fd].close()

                del connections[fd]

s.close()

epoll_obj.close()


客户端:

import socket

flag = 1

s = socket.socket()

s.connect((‘127.0.0.1‘,8888))

while flag:

    input_msg = input(‘input>>>‘)

    if input_msg == ‘0‘:

        break

    s.sendall(input_msg.encode())

    msg = s.recv(1024)

    print(msg.decode())

s.close()

select只支持水平触发,但是epoll同时支持了边缘触发和水平触发。


在linux的IO多路复用中有水平触发,边缘触发两种模式,这两种模式的区别如下:

水平触发:当文件描述符准备就绪,可以执行I/O操作了,就会触发通知,允许在任意时刻重复检测IO的状态。

另一种说法,就是当被监控的文件描述符,有可读事件发生的时候,epoll.poll()会通知处理程序去读写,如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll.poll()时,它还会通知你在上次没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!

如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!!! 优点很明显:稳定可靠


边缘触发:当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll.poll()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,但是感觉不太可靠。


下面我们还从电子的角度来解释一下:

 

 水平触发:也就是只有高电平(1)或低电平(0)时才触发通知,只要在这两种状态就能得到通知.上面提到的只要有数据可读(描述符就绪)那么水平触发的epoll就立即返回.


 边缘触发:只有电平发生变化(高电平到低电平,或者低电平到高电平)的时候才触发通知.上面提到即使有数据 可读,但是没有新的IO活动到来,epoll也不会立即返回.






本文出自 “reBiRTH” 博客,请务必保留此出处http://suhaozhi.blog.51cto.com/7272298/1928120

以上是关于11.python并发入门(part15 关于I/O多路复用)的主要内容,如果未能解决你的问题,请参考以下文章

11.python并发入门(part11 进程同步锁,以及进程池,以及callback的概念)

11.python并发入门(part4 死锁与递归锁)

11.python并发入门(part1 初识进程与线程,并发,并行,同步,异步)

11.python并发入门(part7 线程队列)

11.python并发入门(part12 初识协程)

11.python并发入门(part5 event对象)