Python-IO多路复用

Posted Sch01aR#

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python-IO多路复用相关的知识,希望对你有一定的参考价值。

select实现socket server多并发服务器端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import socket
import select
import queue

server = socket.socket()
server.bind((\'127.0.0.1\', 9999))
server.listen()

server.setblocking(False)  # 要设置为非阻塞

input_list = [server, ]  # 本身也要检测
output_list = []
msg_dic = {}

while True:
    stdinput, stdoutput, stderr = select.select(input_list, output_list, input_list)
    # 第一个input_list参数为要用的连接,ouput_list为可能返回的连接,第二个input_list为可能报错的连接
    # stdinput为连接的地址, stdoutput为返回的连接, stderr为错误的连接
    print(stdinput, stdoutput, stderr)
    try:
        for r in stdinput:
            if r is server:  # 来了个新连接
                conn, addr = server.accept()
                print(\'当前连接客户端:\', addr)
                input_list.append(conn)
                msg_dic[conn] = queue.Queue()  # 初始化一个队列,用来储存要返回给客户端的数据
            else:
                    data = r.recv(1024)
                    print(\'收到数据\', data)
                    msg_dic[r].put(data)
                    output_list.append(r)  # 放入返回的连接队列里
    except socket.error:
        print(\'客户端断开连接\')
        break

    for w in stdoutput:  # 要返回给客户端的连接列表
        data_to_client = msg_dic[w].get()
        w.send(data_to_client)  # 把数据发送给客户端
        output_list.remove(w)  # 确保下次循环的时候stdoutput不返回已经处理完的连接

    for e in stderr:
        if e in output_list:
            output_list.remove(e)
        input_list.remove(e)
        server.close()
        del msg_dic[e]

客户端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import socket

client = socket.socket()
client.connect((\'127.0.0.1\', 9999))

while True:
    msg = input(\'>>>:\').strip()
    if len(msg) ==0:continue
    client.send(msg.encode(\'utf-8\'))
    data = client.recv(1024)
    print(data)

selector模块

selector模块可以使用select和epoll,它会根据所处的平台来选出最适合的I/O多路复用机制,在windows下为select,在linux下为epoll

通过selector模块实现单线程上万并发的socket server

服务器端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()
    print(\'当前连接客户端\', addr)
    conn.setblocking(False)  # 把连接设置为非阻塞
    sel.register(conn, selectors.EVENT_READ, read)  # 新连接注册read回调函数,如果新的连接发送数据就执行read函数

def read(conn, mask):
    data = conn.recv(1024)
    if data:
        print(\'收到数据:\',data)
        conn.send(data)
    else:
        print(\'客户端关闭\', conn)
        sel.unregister(conn)  # 注销注册的事件
        conn.close()

server = socket.socket()
server.bind((\'localhost\', 9999))
server.listen()
server.setblocking(False)
sel.register(server, selectors.EVENT_READ, accept)  # 注册一个事件,如果来了连接,就调用accept函数
# EVENT_READ,表示可读,值mask为1,EVENT_WRITE表示可写,值mask为2

while True:
    events = sel.select()  # 调用epoll或select,默认阻塞,有活动的连接就返回活动的连接列表
    for key, mask in events:
        callback = key.data  # 回调函数,即accept函数
        callback(key.fileobj, mask)  # key.fileobj为文件句柄,即还没建立连接的socket实例

sel.close() # 最后要关闭,确保所有的资源被释放

客户端

# -*- coding:utf-8 -*-
__author__ = "MuT6 Sch01aR"

import socket
import sys

msg = [
        b\'python\',
        b\'php\',
        b\'java\',
      ]

server_address = (\'127.0.0.1\', 9999)

socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(300)]

print(\'connecting to %s port %s\' % server_address)
for s in socks:
    s.connect(server_address)

for m in msg:
    for s in socks:
        print(\'%s: sending "%s"\' % (s.getsockname(), m))
        s.send(m)

    for s in socks:
        data = s.recv(1024)
        print(\'%s: received "%s"\' % (s.getsockname(), data))
        if not data:
            print(sys.stderr, \'closing socket\', s.getsockname())

服务器端运行结果

客户端运行结果

300个socket连接1秒左右就全结束了

以上是关于Python-IO多路复用的主要内容,如果未能解决你的问题,请参考以下文章

python-IO多路复用的selectors模块

python-IO多路复用之epoll

计算机网络3.1 运输层概述 与 多路复用/分解

请不要再说NIO和多路复用IO是同一个东西了(内含BIONIO多路复用NettyAIO案例测试代码)

为啥 4 位多路复用器测试台代码给出 x?

Kotlin 协程协程中的多路复用技术 ② ( select 函数原型 | SelectClauseN 事件 | 查看挂起函数是否支持 select )