select模块

Posted wodeboke-y

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了select模块相关的知识,希望对你有一定的参考价值。

select模块

 

 

1.      select模块

源:select.py

This module provides access to the select() and poll() functions available in most operating systems, devpoll() available on Solaris and derivatives, epoll() available on Linux 2.5+ and kqueue() available on most BSD.

该模块中select() and poll()方法在大多数操作系统中可用,devpoll()在Solaris and derivatives中可用,epoll()在Linux2.5+中可用,kqueue()在大多数bsd中可用。

Note that on Windows, it only works for sockets; on other operating systems, it also works for other file types (in particular, on Unix, it works on pipes). It cannot be used on regular files to determine whether a file has grown since it was last read.

注意:在win系列中,它只对sockets有用;在其它平台,file types也可使用(看起来类似pipes),它不能用于确定一个文件的大小是否增长。

 

1.1.    定义对象

模块定义了以下对象:

1.1.1.   exception select.error

A deprecated alias of OSError.

Changed in version 3.3: Following PEP 3151, this class was made an alias of OSError.

异常类,略。

 

1.1.2.   select.devpoll()

(Only supported on Solaris and derivatives.) Returns a /dev/poll polling object;

 

1.1.3.   select.epoll(sizehint=-1, flags=0)

(Only supported on Linux 2.5.44 and newer.) Return an edge polling object, which can be used as Edge or Level Triggered interface for I/O events.

sizehint informs epoll about the expected number of events to be registered. It must be positive, or -1 to use the default. It is only used on older systems where epoll_create1() is not available; otherwise it has no effect (though its value is still checked).

flags is deprecated and completely ignored. However, when supplied, its value must be 0 or select.EPOLL_CLOEXEC, otherwise OSError is raised.

See the Edge and Level Trigger Polling (epoll) Objects section below for the methods supported by epolling objects.

epoll objects support the context management protocol: when used in a with statement, the new file descriptor is automatically closed at the end of the block.

The new file descriptor is non-inheritable.

Changed in version 3.3: Added the flags parameter.

Changed in version 3.4: Support for the with statement was added. The new file descriptor is now non-inheritable.

Deprecated since version 3.4: The flags parameter. select.EPOLL_CLOEXEC is used by default now. Use os.set_inheritable() to make the file descriptor inheritable.

 

 

1.1.4.   select.poll()

(Not supported by all operating systems.) Returns a polling object, which supports registering and unregistering file descriptors, and then polling them for I/O events; see section Polling Objects below for the methods supported by polling objects.

 

1.1.5.   select.kqueue()

(Only supported on BSD.) Returns a kernel queue object; see section Kqueue Objects below for the methods supported by kqueue objects.

The new file descriptor is non-inheritable.

Changed in version 3.4: The new file descriptor is now non-inheritable.

 

1.1.6.   select.kevent(ident, filter=KQ_FILTER_READ, flags=KQ_EV_ADD, fflags=0, data=0, udata=0)

(Only supported on BSD.) Returns a kernel event object; see section Kevent Objects below for the methods supported by kevent objects.

 

1.1.7.   select.select(rlist, wlist, xlist[, timeout])

This is a straightforward interface to the Unix select() system call.

它是一个直达式接口,指向unix select()系统调用。

The first three arguments are sequences of ‘waitable objects’: either integers representing file descriptors or objects with a parameterless method named fileno() returning such an integer:

前三个参数是“可等待对象”序列,要么是整数-用于描述文件描述符;要么是一个名为fileno()无参数方法,它返回一个整数,作用同上。

rlist: wait until ready for reading 等待直到可以读

wlist: wait until ready for writing 等待至可写

xlist: wait for an “exceptional condition” (see the manual page for what your system considers such a condition) 等待一个异常条件

Empty sequences are allowed, but acceptance of three empty sequences is platform-dependent. (It is known to work on Unix but not on Windows.)

空序列是可以的,但具体情况依赖于平台。unix支持,windows不。

The optional timeout argument specifies a time-out as a floating point number in seconds. When the timeout argument is omitted the function blocks until at least one file descriptor is ready. A time-out value of zero specifies a poll and never blocks.

The return value is a triple of lists of objects that are ready: subsets of the first three arguments. When the time-out is reached without a file descriptor becoming ready, three empty lists are returned.

可选参数time-out指定一个浮点数,单位秒。

当它未指定时,函数阻塞直至至少一个fd就绪。

它为0意为查询一次,且不阻塞,立即返回。

返回值是三个就绪对象列表组合,它是三个参数的子集。

当超时点到达,且无一个fd就绪,返回三个空列表。

Among the acceptable object types in the sequences are Python file objects (e.g. sys.stdin, or objects returned by open() or os.popen()), socket objects returned by socket.socket(). You may also define a wrapper class yourself, as long as it has an appropriate fileno() method (that really returns a file descriptor, not just a random integer).

Note:File objects on Windows are not acceptable, but sockets are. On Windows, the underlyingselect() function is provided by the WinSock library, and does not handle file descriptors that don’t originate from WinSock.

注意事项:对于本函数而言,windows fd是不可接受对象,但sokets可以。

在windows中,select()底层实现由WinSock library提供,不能处理非源自Winsock的fd。

 

1.1.8.   select.PIPE_BUF

The minimum number of bytes which can be written without blocking to a pipe when the pipe has been reported as ready for writing by select(), poll() or another interface in this module. This doesn’t apply to other kind of file-like objects such as sockets.

This value is guaranteed by POSIX to be at least 512.

Availability: Unix

 

2.      案例

2.1.    select()

总结:

实测中发现socket一直是可读状态。

1.select会被调用很多次, 这就意味着:

当文件描述符过多时,文件描述符在用户空间与内核空间进行copy会很费时

当文件描述符过多时,内核对文件描述符的遍历也很浪费时间

select最大仅仅支持1024个文件描述符

 

#coding:utf-8

"""
----------------------------------------
description:

author: sss

date:
----------------------------------------
change:
   
----------------------------------------

"""
__author__ = ‘sss‘


def pr_type(i, info=‘‘):
    print(info, i, type(i))


import select
import socket
import queue
from time import sleep

# 并发服务端
def server_select():
    # create socket
   
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 非阻塞模式
   
server.setblocking(False)

    # bind the socket to the port
   
server_addr = (‘localhost‘, 9009)
    print(‘server running, port ‘.format(server_addr))
    server.bind(server_addr)

    # listen for incoming connections
   
server.listen(5)

    # define rlist
    
inputs = [server]
    outputs = [] # wlist
   
message_queues =

    while inputs:
        print(‘waiting for the next event.‘)
        readable, writable, exceptional = select.select(inputs, outputs, inputs)
        #pr_type(readable)
        #pr_type(writable)
        #pr_type(exceptional)


       
for s in readable:
            if s is server:
                connection, client_address = s.accept()
                print(‘connection from ‘.format(client_address))

                connection.setblocking(0)
                inputs.append(connection)
                message_queues[connection] = queue.Queue()
            else:
                data = s.recv(1024)
                if data != ‘‘:
                    print(‘recived "" from ""‘.format(data, s.getpeername()))
                    message_queues[s].put(data)
                    if s not in outputs:
                        outputs.append(s)
                else:
                    print(‘closing‘, client_address)
                    if s in outputs:
                        outputs.remove(s)
                    inputs.reverse(s)
                    s.close()

                    del message_queues[s]

        for s in writable:
            try:
                message_queue = message_queues.get(s)
                send_data = ‘‘
               
if message_queue is not None:
                    send_data = message_queue.get_nowait()
                else:
                    print(‘has closed‘)
            except queue.Empty:
                print(‘‘.format(s.getpeername()))
                outputs.remove(s)
            else:
                if message_queue is not None:
                    s.send(send_data)
                else:
                    print(‘has closed‘)

        for s in exceptional:
            print(‘exception condition on‘, s.getpeername())
            inputs.reverse(s)
            if s in outputs:
                outputs.remove(s)
            s.close()

            del message_queues[s]

        sleep(1)

if __name__ == ‘__main__‘:
    #pr_type(‘s‘)
   
server_select()



 

 

#coding:utf-8

"""
----------------------------------------
description:

author: sss

date:
----------------------------------------
change:
   
----------------------------------------

"""
__author__ = ‘sss‘


def pr_type(i, info=‘‘):
    print(info, i, type(i))

import select
import socket
import queue

def client_select():

    messages = [‘This is the message ‘, ‘It will be sent ‘, ‘in parts ‘, ]
    server_address = (‘localhost‘, 9009)

    # create a TCP/IP socket
   
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        , socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        , socket.socket(socket.AF_INET, socket.SOCK_STREAM)
             ]

    print(‘connecting to on port ‘.format(*server_address))

    for s in socks:
        s.connect(server_address)

    for index, message in enumerate(messages):
        for s in socks:
            print(‘: sending ""‘.format(s.getsockname(), message + str(index)))
            s.send((message + str(index)).encode(‘utf-8‘))

    for s in socks:
        data = s.recv(1024)
        print (‘%s: received "%s"‘ % (s.getsockname(), data))
        if data != "":
            print (‘closingsocket‘, s.getsockname())
            s.close()
if __name__ == ‘__main__‘:
    #pr_type(‘s‘)
   
client_select()

 

2.2.    案例2

select()实现非阻塞IO

 

# !/usr/bin/env python
# *-* coding:utf-8 *-*
import socket
import select
import sys
import signal


class ChatServer():
    def __init__(self, host, port, timeout=10, backlog=5):
        # 记录连接的客户端数量
       
self.clients = 0
        # 存储连接的客户端socket和地址对应的字典
       
self.clientmap =
        # 存储连接的客户端socket
       
self.outputs = []
        # 建立socket
       
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind((host, port))
        self.server.listen(backlog)
        # 增加信号处理
       
signal.signal(signal.SIGINT, self.sighandler)

    def sighandler(self):
        sys.stdout.write("Shutdown Server......\n")
        # 向已经连接客户端发送关系信息,并主动关闭socket
       
for output in self.outputs:
            output.send("Shutdown Server")
            output.close()
        # 关闭listen
       
self.server.close()
        sys.stdout.flush()

    # 主函数,用来启动服务器
   
def run(self):
        # 需要监听的可读对象
       
inputs = [self.server]

        runing = True
       
# 添加监听主循环
       
while runing:
            try:
                readable, writeable, exceptional = select.select(inputs, self.outputs, [])
                # 此处会被select模块阻塞,只有当监听的三个参数发生变化时,select才会返回
           
except select.error as e:
                break
           
# 当返回的readable中含有本地socket的信息时,表示有客户端正在请求连接
           
if self.server in readable:
                # 接受客户端连接请求
               
client, addr = self.server.accept()
                sys.stdout.write("New Connection from %s\n" % str(addr))
                sys.stdout.flush()
                # 更新服务器上客户端连接情况
                # 1,数量加1
                # 2,self.outputs增加一列
                # 3,self.clientmap增加一对
                # 4, 给input添加可读监控
               
self.clients += 1
                self.outputs.append(client)
                self.clientmap[client] = addr
                inputs.append(client)

            # readable中含有已经添加的客户端socket,并且可读
            # 说明 1,客户端有数据发送过来或者 2,客户端请求关闭
           
elif len(readable) != 0:
                # 1, 取出这个列表中的socket
                
csock = readable[0]
                # 2, 根据这个socket,在事先存放的clientmap中,去除客户端的地址,端口的详细信息
               
host, port = self.clientmap[csock]
                # 3,取数据, 或接受关闭请求,并处理
                # 注意,这个操作是阻塞的,但是由于数据是在本地缓存之后,所以速度会非常快
               
try:
                    data = csock.recv(1024).strip()
                    for cs in self.outputs:
                        if cs != csock:
                            cs.send("%s\n" % data)
                except socket.error as e:
                    self.clients -= 1
                    inputs.remove(csock)
                    self.outputs.remove(csock)
                    del self.clientmap[csock]
            # print self.outputs
       
self.server.close()


if __name__ == "__main__":
    chat = ChatServer("", 8008)
    chat.run()

运行这个脚本,然后用任意客户端如telnet或netcat连接8008端口,多个客户端之间就可以进行对话。

其实select模块本身是阻塞的,当需要监控的socket发生变化时,select作出返回,下面的程序会继续执行,程序根据select的返回值,对各种情况作出处理。

 

以上是关于select模块的主要内容,如果未能解决你的问题,请参考以下文章

python中的select模块

python-IO多路复用,select模块

SELECT版FTP

Python IO多路复用select模块

找不到模块“react-select/async”的声明文件

Python——IO多路复用之select模块epoll方法