论事件驱动与多路IO复用
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了论事件驱动与多路IO复用相关的知识,希望对你有一定的参考价值。
通常,我们写服务器处理模型的程序时,有以下几种模型:
- (1)每收到一个请求,创建一个新的进程,来处理该请求;
- (2)每收到一个请求,创建一个新的线程,来处理该请求;
- (3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求
上面的几种方式,各有千秋,
第(1)中方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。
第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。
第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。
综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
更多姿势
http://www.cnblogs.com/alex3714/articles/5248247.html
I/O操作由操作系统完成,遇到IO操作,主进程交给操作系统处理IO,处理完成通过回调函数告诉主进程。
IO多路复用
番外篇
http://www.cnblogs.com/alex3714/articles/5876749.html
select、poll、epoll、本质为IO多路复用。
select 版本
简单版:
import select
import socket
import queue
server = socket.socket()
server.bind(("localhost", 9990))
server.listen(1000)
server.setblocking(False) # 不阻塞
msg_dic = {}
inputs = [server,]
outputs = []
while True:
readable, writeable, exceptional = select.select(inputs, outputs, inputs)
print(readable, writeable, exceptional)
for r in readable:
if r is server:
conn, addr = server.accept()
print("来了个新连接", addr)
inputs.append(conn)
else:
try:
data = r.recv(1024)
print("收到的数据", data)
conn.send(data)
except ConnectionResetError:
print("[%s]客户端断开了" % r)
inputs.remove(r)
高级装逼版
import select
import socket
import queue
server = socket.socket()
server.setblocking(0)
server_addr = (‘localhost‘, 10000)
print(‘starting up on %s port %s‘ % server_addr)
server.bind(server_addr)
server.listen(5)
inputs = [server, ] # 自己也要监测呀,因为server本身也是个fd
outputs = []
message_queues = {}
while True:
print("waiting for next event...")
readable, writeable, exeptional = select.select(inputs, outputs, inputs) # 如果没有任何fd就绪,那程序就会一直阻塞在这里
for s in readable: # 每个s就是一个socket
if s is server: # 别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
# 就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
# 新连接进来了,接受这个连接
conn, client_addr = s.accept()
print("new connection from", client_addr)
conn.setblocking(0)
inputs.append(conn) # 为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
# 就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
# readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的
message_queues[conn] = queue.Queue() # 接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送
else: # s不是server的话,那就只能是一个 与客户端建立的连接的fd了
# 客户端的数据过来了,在这接收
data = s.recv(1024)
if data:
print("收到来自[%s]的数据:" % s.getpeername()[0], data)
message_queues[s].put(data) # 收到的数据先放到queue里,一会返回给客户端
if s not in outputs:
outputs.append(s) # 为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
else: # 如果收不到data代表什么呢? 代表客户端断开了呀
print("客户端断开了",s)
if s in outputs:
outputs.remove(s) # 清理已断开的连接
inputs.remove(s) # 清理已断开的连接
del message_queues[s] # 清理已断开的连接
for s in writeable:
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
print("client [%s]" %s.getpeername()[0], "queue is empty..")
outputs.remove(s)
else:
print("sending msg to [%s]" % s.getpeername()[0], next_msg)
s.send(next_msg.upper())
for s in exeptional:
print("handling exception for ", s.getpeername())
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del message_queues[s]
secoketor模块
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print("accepted", conn, "from", addr, mask)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # 新连接注册read回调函数
def read(conn, mask):
data = conn.recv(1024)
if data:
print("echoing", repr(data), "to", conn)
conn.send(data)
else:
print("closing", conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(("localhost", 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept) # 来新连接调accept
while True:
events = sel.select() # 默认阻塞,有活动连接就返回活动的连接列表 epoll/select
for key, mask in events:
callback = key.data # accept
callback(key.fileobj, mask) # key.fileobj= 文件句柄
客户端
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__:JasonLIN
import socket
import sys
# send_buf_size = 8192 # 设置发送缓冲域大小
# get_buf_size = 8192 # 设置接收缓冲域大小
messages = [b‘This is the message. ‘,
b‘It will be sent ‘,
b‘in parts.‘,
]
server_address = (‘192.168.31.102‘, 10000)
# Create a TCP/IP socket
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(10000)]
# Connect the socket to the port where the server is listening
print(‘connecting to %s port %s‘ % server_address)
for s in socks:
# s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
# s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf_size)
# s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, get_buf_size)
# s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.connect(server_address)
for message in messages:
# Send messages on both sockets
for s in socks:
print(‘%s: sending "%s"‘ % (s.getsockname(), message))
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print(‘%s: received "%s"‘ % (s.getsockname(), data))
if not data:
print(sys.stderr, ‘closing socket‘, s.getsockname())
以上是关于论事件驱动与多路IO复用的主要内容,如果未能解决你的问题,请参考以下文章