一个简单的单线程异步服务器

Posted alben-xue

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个简单的单线程异步服务器相关的知识,希望对你有一定的参考价值。

使用内select模块构造单线程异步服务器

关键:

poll(),生成器

 

服务器模块:

#!/usr/bin/env python3
#-*- encoding:UTF -*-

import argparse,socket,time

aphorisms = {bBeautiful is better than?:bUgly.,
             bExplicit is better than?:bImplicit.,
             bSimple is better than?:bComplex.}

def get_answer(aphorism):
    return aphorisms.get(aphorism,bError:unknow aphorism)

def parse_command_line(description):
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument(host,help=IP or hostname)
    parser.add_argument(-p,metavar=port,type=int,default=1060,
                        help=TCP port (default 1060))
    args = parser.parse_args()
    address =(args.host,args.p)
    return address

def create_srv_socket(address):
    listener = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    listener.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    listener.bind(address)
    listener.listen(64)
    print(Listening at {}.format(address))
    return listener

def accept_connections_forever(listener):
    while True:
        sock,address = listener.accpet()
        print(Accepted connection from {}.format(address))
        handle_conversation(sock,address)

def handle_conversation(sock,address):
    try:
        while True:
            handle_request(sock)
    except EOFError:
        print(Client socket to {} has closed.format(address))
    except Exception as e:
        print(Client {} error :{}.format(address,e))
    finally:
        socket.close()

def handle_request(sock):
    aphorism = recv_until(sock,b?)
    answer = get_answer(aphorism)
    sock.sendall(answer)

def recv_until(sock,suffix):
    message = sock.recv(4096)
    if not message:
        raise EOFError(socket closed)
    while not message.endswith(suffix):
        data = sock.recv(4096)
        if not data:
            raise IOError(received {!r} then socket closed.format(message))
        message += data
    return message

 

客户端模块:

#!/usr/bin/env python3
#-*- encding:UTF-8 -*-

import argparse,random,socket,zen_utils

def client(address,cause_error=False):
    sock =socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.connect(address)
    aphorisms = list(zen_utils.aphorisms)
    if cause_error:
        sock.sendall(aphorisms[0][:-1])
        return
    for aphorism in random.sample(aphorisms,3):
        sock.sendall(aphorism)
        print(aphorism,zen_utils.recv_until(sock,b.))
    sock.close()

if __name__ == __main__:
    parser = argparse.ArgumentParser(description = Example client)
    parser.add_argument(host,help=IP or Hostname)
    parser.add_argument(-p,metavar=port,type=int,default=1060,
                        help=TCP Port number)
    parser.add_argument(-e,action=store_true,help=cause an error)
    args = parser.parse_args()
    address = (args.host,args.p)
    client(address,args.e)

异步服务器:

#!/usr/bin/env python3
#-*- encoding:UTF -*-

import select,zen_utils

def all_events_forever(poll_object):
    while True:
        for fd,event in poll_object.poll():
            yield fd,event

def serve(listener):
    sockets = {listener.fileno():listener}
    addresses = {}
    bytes_received = {}
    bytes_to_send = {}

    poll_object = select.poll()
    poll_object.register(listener,select.POLLIN)

    for fd,event in all_events_forever(poll_object):
        sock = sockets[fd]

        if event &(select.POLLHUP | select.POLLERR | select.POLLNVAL):
            address = addresses.pop(sock)
            rb = bytes_received.pop(sock,b‘‘)
            sb = bytes_to_send.pop(sock,b‘‘)
            if rb:
                print(Client {} sent {} but then closed.format(address,rb))
            elif sb:
                print(Client {} closed before we sent {}.format(address,sb))
            else:
                print(Client {} closed socket normally.format(address))
            poll_object.unregister(fd)
            del sockets[fd]

        elif sock is listener:
            sock,address = sock.accept()
            print(Accepted connection from {}.format(address))
            sock.setblocking(False)
            sockets[sock.fileno()] = sock
            addresses[sock] = address
            poll_object.register(sock,select.POLLIN)

        elif event & select.POLLIN:
            more_data = sock.recv(4096)
            if not more_data:
                sock.close()
                continue
            data = bytes_received.pop(sock,b‘‘) + more_data
            if data.endswith(b?):
                bytes_to_send[sock] = zen_utils.get_answer(data)
                poll_object.modify(sock,select.POLLOUT)

        elif event & select.POLLOUT:
            data = bytes_to_send.pop(sock)
            n = sock.send(data)
            if n < len(data):
                bytes_to_send[sock] = data[n:]
            else:
                poll_object.modify(sock,select.POLLIN)

if __name__ == __main__:
    address = zen_utils.parse_command_line(Low-level async server)
    listener = zen_utils.create_srv_socket(address)
    serve(listener)

这个异步服务器的核心是它的缓冲区:

在等待某个请求完成时,会将受到的数据存储在bytes_received字典中;在等待操作系统安排发送数据时,会将要发送的字节存储在bytes_to_send字典中。

这两个缓冲区与我们告知poll()要在每个套接字上等待的事件一起形成了一个完整的状态机,用于一步一步的处理客户端会话。

 

以上是关于一个简单的单线程异步服务器的主要内容,如果未能解决你的问题,请参考以下文章

js的单线程和异步

node.js的单线程异步是什么意思呢?(转)

漫谈:一个简单的单线程基于epoll的echo服务器(附简单的性能测试)

Javascript的单线程和异步编程

我想这次我真的理解了 JavaScript 的单线程机制

csharp 线程和异步任务片段