经典5种IO模型 | IO多路复用

Posted 逸鹏说道

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了经典5种IO模型 | IO多路复用相关的知识,希望对你有一定的参考价值。

  • 3.2.概念篇

  • 1.同步与异步

  • 2.阻塞与非阻塞

  • 3.五种IO模型

  • 4.Unix图示

  • 3.3.IO多路复用

  • 1.Select

  • 2.EPoll

  • 3.通用写法( Selector


上篇回顾:

3.2.概念篇

1.同步与异步

同步是指一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成。

异步是指不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作。然后继续执行下面代码逻辑,只要自己完成了整个任务就算完成了(异步一般使用状态、通知和回调)

PS:项目里面一般是这样的:(个人经验)

  1. 同步架构:一般都是和钱相关的需求,需要实时返回的业务

  2. 异步架构:更多是对写要求比较高时的场景(同步变异步)

    • 读一般都是实时返回,代码一般都是 awaitxxx()

  3. 想象个情景就清楚了:

    • 异步:现在用户写了篇文章,可以异步操作,就算没真正写到数据库也可以返回:发表成功(大不了失败提示一下)

    • 同步:用户获取订单信息,你如果异步就会这样了:提示下获取成功,然后一片空白...用户不卸载就怪了...

2.阻塞与非阻塞

阻塞是指调用结果返回之前,当前线程会被挂起,一直处于等待消息通知,不能够执行其他业务(大部分代码都是这样的)

非阻塞是指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回(继续执行下面代码,或者重试机制走起)

PS:项目里面重试机制为啥一般都是3次?

  1. 第一次重试,两台PC挂了也是有可能的

  2. 第二次重试,负载均衡分配的三台机器同时挂的可能性不是很大,这时候就有可能是网络有点拥堵了

  3. 最后一次重试,再失败就没意义了,日记写起来,再重试网络负担就加大了,得不偿失了

3.五种IO模型

  1. 准备数据

由于存在这两个阶段,Linux产生了下面五种IO模型( socket为例

  1. 阻塞式IO:

    • Linux中默认情况下所有的socket都是阻塞的

  2. 非阻塞式IO:

    • 当用户进程发出read操作时,如果 kernel中的数据还没有准备好,那么它并不会 block用户进程,而是立刻返回一个 error

    • 用户进程判断结果是一个 error时,它就知道数据还没有准备好,于是它可以再次发送read操作

    • 一旦 kernel中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回

    • 非阻塞IO模式下用户进程需要不断地询问内核的数据准备好了没有

  3. IO多路复用:

    • 通过一种机制,一个进程可以监视多个文件描述符(套接字描述符)一旦某个文件描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作(这样就不需要每个用户进程不断的询问内核数据准备好了没)

    • 常用的IO多路复用方式有 select、 poll和 epoll

  4. 信号驱动IO:(之前我们讲的时候说过)

    • 内核文件描述符就绪后,通过信号通知用户进程,用户进程再通过系统调用读取数据。

    • 此方式属于同步IO(实际读取数据到用户进程缓存的工作仍然是由用户进程自己负责的)

  5. 异步IO( POSIX的 aio_系列函数)

    • 用户进程发起read操作之后,立刻就可以开始去做其它的事。内核收到一个异步 IO read之后,会立刻返回,不会阻塞用户进程。

    • 内核会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,内核会给用户进程发送一个 signal告诉它read操作完成了

4.Unix图示

贴一下Unix编程里面的图:


非阻塞IO


IO复用

【经典】5种IO模型 | IO多路复用


信号IO

【经典】5种IO模型 | IO多路复用


异步AIO

3.3.IO多路复用

开始之前咱们通过非阻塞IO引入一下:(来个简单例子 socket.setblocking(False))

 
   
   
 
  1. import time

  2. import socket


  3. def select(socket_addr_list):

  4.    for client_socket, client_addr in socket_addr_list:

  5.        try:

  6.            data = client_socket.recv(2048)

  7.            if data:

  8.                print(f"[来自{client_addr}的消息:]\n")

  9.                print(data.decode("utf-8"))

  10.                client_socket.send(

  11.                    b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"

  12.                )

  13.            else:

  14.                # 没有消息是触发异常,空消息是断开连接

  15.                client_socket.close()  # 关闭客户端连接

  16.                socket_addr_list.remove((client_socket, client_addr))

  17.                print(f"[客户端{client_addr}已断开连接,当前连接数:{len(socket_addr_list)}]")

  18.        except Exception:

  19.            pass


  20. def main():

  21.    # 存放客户端集合

  22.    socket_addr_list = list()


  23.    with socket.socket() as tcp_server:

  24.        # 防止端口绑定的设置

  25.        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

  26.        tcp_server.bind(('', 8080))

  27.        tcp_server.listen()

  28.        tcp_server.setblocking(False)  # 服务端非阻塞

  29.        while True:

  30.            try:

  31.                client_socket, client_addr = tcp_server.accept()

  32.                client_socket.setblocking(False)  # 客户端非阻塞

  33.                socket_addr_list.append((client_socket, client_addr))

  34.            except Exception:

  35.                pass

  36.            else:

  37.                print(f"[来自{client_addr}的连接,当前连接数:{len(socket_addr_list)}]")

  38.            # 防止客户端断开后出错

  39.            if socket_addr_list:

  40.                # 轮询查看客户端有没有消息

  41.                select(socket_addr_list)  # 引用传参

  42.                time.sleep(0.01)


  43. if __name__ == "__main__":

  44.    main()

输出:

可以思考下:

  1. 为什么Server也要设置为非阻塞?

    • PS:一个线程里面只能有一个死循环,现在程序需要两个死循环,so ==> 放一起咯

  2. 断开连接怎么判断?

    • PS:没有消息是触发异常,空消息是断开连接

  3. client_socket为什么不用dict存放?

    • PS:dict在循环的过程中,del会引发异常

1.Select

select和上面的有点类似,就是轮询的过程交给了操作系统:

kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程

来个和上面等同的案例:

 
   
   
 
  1. import select

  2. import socket


  3. def main():

  4.    with socket.socket() as tcp_server:

  5.        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

  6.        tcp_server.bind(('', 8080))

  7.        tcp_server.listen()

  8.        socket_info_dict = dict()

  9.        socket_list = [tcp_server]  # 监测列表

  10.        while True:

  11.            # 劣势:select列表数量有限制

  12.            read_list, write_list, error_list = select.select(

  13.                socket_list, [], [])

  14.            for item in read_list:

  15.                # 服务端迎接新的连接

  16.                if item == tcp_server:

  17.                    client_socket, client_address = item.accept()

  18.                    socket_list.append(client_socket)

  19.                    socket_info_dict[client_socket] = client_address

  20.                    print(f"[{client_address}已连接,当前连接数:{len(socket_list)-1}]")

  21.                # 客户端发来

  22.                else:

  23.                    data = item.recv(2048)

  24.                    if data:

  25.                        print(data.decode("utf-8"))

  26.                        item.send(

  27.                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"

  28.                        )

  29.                    else:

  30.                        item.close()

  31.                        socket_list.remove(item)

  32.                        info = socket_info_dict[item]

  33.                        print(f"[{info}已断开,当前连接数:{len(socket_list)-1}]")


  34. if __name__ == "__main__":

  35.    main()

输出和上面一样

扩展说明:

select 函数监视的文件描述符分3类,分别是 writefdsreadfds、和 exceptfds。调用后select函数会阻塞,直到有描述符就绪函数返回(有数据可读、可写、或者有except)或者超时(timeout指定等待时间,如果立即返回设为null即可)

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024(64位=>2048)

然后Poll就出现了,就是把上限给去掉了,本质并没变,还是使用的 轮询

2.EPoll

epoll在内核2.6中提出(Linux独有),使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,采用监听回调的机制,这样在用户空间和内核空间的copy只需一次,避免再次遍历就绪的文件描述符列表

先来看个案例吧:(输出和上面一样)

 
   
   
 
  1. import socket

  2. import select


  3. def main():

  4.    with socket.socket() as tcp_server:

  5.        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

  6.        tcp_server.bind(('', 8080))

  7.        tcp_server.listen()


  8.        # epoll是linux独有的

  9.        epoll = select.epoll()

  10.        # tcp_server注册到epoll中

  11.        epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)


  12.        # key-value

  13.        fd_socket_dict = dict()


  14.        # 回调需要自己处理

  15.        while True:

  16.            # 返回可读写的socket fd 集合

  17.            poll_list = epoll.poll()

  18.            for fd, event in poll_list:

  19.                # 服务器的socket

  20.                if fd == tcp_server.fileno():

  21.                    client_socket, client_addr = tcp_server.accept()

  22.                    fd = client_socket.fileno()

  23.                    fd_socket_dict[fd] = (client_socket, client_addr)

  24.                    # 把客户端注册进epoll中

  25.                    epoll.register(fd, select.EPOLLIN | select.EPOLLET)

  26.                else:  # 客户端

  27.                    client_socket, client_addr = fd_socket_dict[fd]

  28.                    data = client_socket.recv(2048)

  29.                    print(

  30.                        f"[来自{client_addr}的消息,当前连接数:{len(fd_socket_dict)}]\n")

  31.                    if data:

  32.                        print(data.decode("utf-8"))

  33.                        client_socket.send(

  34.                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"

  35.                        )

  36.                    else:

  37.                        del fd_socket_dict[fd]

  38.                        print(

  39.                            f"[{client_addr}已离线,当前连接数:{len(fd_socket_dict)}]\n"

  40.                        )

  41.                        # 从epoll中注销

  42.                        epoll.unregister(fd)

  43.                        client_socket.close()


  44. if __name__ == "__main__":

  45.    main()

扩展:epoll的两种工作模式

LT(level trigger,水平触发)模式:当epollwait检测到描述符就绪,将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epollwait时,会再次响应应用程序并通知此事件。LT模式是默认的工作模式。 LT模式同时支持阻塞和非阻塞socket。

ET(edge trigger,边缘触发)模式:当epollwait检测到描述符就绪,将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epollwait时,不会再次响应应用程序并通知此事件。 ET是高速工作方式,只支持非阻塞socket(ET模式减少了epoll事件被重复触发的次数,因此效率要比LT模式高)

Code提炼一下:

  1. 实例化对象: epoll=select.epoll()

  2. 注册对象: epoll.register(tcp_server.fileno(),select.EPOLLIN|select.EPOLLET)

  3. 注销对象: epoll.unregister(fd)

PS: epoll不一定比 Select性能高,一般都是分场景的:

  1. 高并发下,连接活跃度不高时:epoll比Select性能高(eg:web请求,页面随时关闭)

  2. 并发不高,连接活跃度比较高:Select更合适(eg:小游戏)

  3. Select是win和linux通用的,而epoll只有linux有

其实IO多路复用还有一个 kqueue,和 epoll类似,下面的通用写法中有包含


3.通用写法( Selector

一般来说:Linux下使用epoll,Win下使用select(IO多路复用会这个通用的即可)

先看看Python源代码:

 
   
   
 
  1. # 选择级别:epoll|kqueue|devpoll > poll > select

  2. if 'KqueueSelector' in globals():

  3.    DefaultSelector = KqueueSelector

  4. elif 'EpollSelector' in globals():

  5.    DefaultSelector = EpollSelector

  6. elif 'DevpollSelector' in globals():

  7.    DefaultSelector = DevpollSelector

  8. elif 'PollSelector' in globals():

  9.    DefaultSelector = PollSelector

  10. else:

  11.    DefaultSelector = SelectSelector

实战案例:(可读和可写可以不分开)

 
   
   
 
  1. import socket

  2. import selectors


  3. # Linux下使用epoll,Win下使用select

  4. Selector = selectors.DefaultSelector()


  5. class Task(object):

  6.    def __init__(self):

  7.        # 存放客户端fd和socket键值对

  8.        self.fd_socket_dict = dict()


  9.    def run(self):

  10.        self.server = socket.socket()

  11.        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

  12.        self.server.bind(('', 8080))

  13.        self.server.listen()

  14.        # 把Server注册到epoll

  15.        Selector.register(self.server.fileno(), selectors.EVENT_READ,

  16.                          self.connected)


  17.    def connected(self, key):

  18.        """客户端连接时处理"""

  19.        client_socket, client_address = self.server.accept()

  20.        fd = client_socket.fileno()

  21.        self.fd_socket_dict[fd] = (client_socket, client_address)

  22.        # 注册一个客户端读的事件(服务端去读消息)

  23.        Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)

  24.        print(f"{client_address}已连接,当前连接数:{len(self.fd_socket_dict)}")


  25.    def call_back_reads(self, key):

  26.        """客户端可读时处理"""

  27.        # 一个fd只能注册一次,监测可写的时候需要把可读给注销

  28.        Selector.unregister(key.fd)

  29.        client_socket, client_address = self.fd_socket_dict[key.fd]

  30.        print(f"[来自{client_address}的消息:]\n")

  31.        data = client_socket.recv(2048)

  32.        if data:

  33.            print(data.decode("utf-8"))

  34.            # 注册一个客户端写的事件(服务端去发消息)

  35.            Selector.register(key.fd, selectors.EVENT_WRITE,

  36.                              self.call_back_writes)

  37.        else:

  38.            client_socket.close()

  39.            del self.fd_socket_dict[key.fd]

  40.            print(f"{client_address}已断开,当前连接数:{len(self.fd_socket_dict)}")


  41.    def call_back_writes(self, key):

  42.        """客户端可写时处理"""

  43.        Selector.unregister(key.fd)

  44.        client_socket, client_address = self.fd_socket_dict[key.fd]

  45.        client_socket.send(b"ok")

  46.        Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)


  47. def main():

  48.    t = Task()

  49.    t.run()

  50.    while True:

  51.        ready = Selector.select()

  52.        for key, obj in ready:

  53.            # 需要自己回调

  54.            call_back = key.data

  55.            call_back(key)


  56. if __name__ == "__main__":

  57.    main()

Code提炼一下:

  1. 实例化对象: Selector=selectors.DefaultSelector()

  2. 注册对象:

    • Selector.register(server.fileno(),selectors.EVENT_READ,call_back)

    • Selector.register(server.fileno(),selectors.EVENT_WRITE,call_back)

  3. 注销对象: Selector.unregister(key.fd)

  4. 注意一下:一个fd只能注册一次,监测可写的时候需要把可读给注销(反之一样)

业余拓展:

 
   
   
 
  1. select, iocp, epoll,kqueue及各种I/O复用机制

  2. https://blog.csdn.net/shallwake/article/details/5265287


  3. kqueue用法简介

  4. http://www.cnblogs.com/luminocean/p/5631336.html

下级预估:协程篇 or 网络深入篇


以上是关于经典5种IO模型 | IO多路复用的主要内容,如果未能解决你的问题,请参考以下文章

5种io模型摘要

五种网络IO模型以及多路复用IO中select/epoll对比

架构IO多路复用

架构IO多路复用

IO多路复用的三种机制Select,Poll,Epoll

7-4 并发编程IO多路复用常见考题