如何通过python3 asyncio reuse_port编写正确的多进程服务器程序?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何通过python3 asyncio reuse_port编写正确的多进程服务器程序?相关的知识,希望对你有一定的参考价值。

我打算通过python3 + asyncio编写一个多进程服务器程序,我发现在AbstractEventLoop.create_server中有一个名为'reuse_port'的参数,它看起来就像我想要的那样。

所以我写了一些代码,我使用多处理来创建一些进程,每个进程创建一个asyncio事件循环,所有这些进程都在同一个端口上侦听。

我认为这些进程可以协同工作,以响应请求,但是当我测试这个服务器程序时,我发现只有一个进程会一直响应我的请求。

那么为什么其他进程不响应请求呢?我的代码中有没有BUGS?

OSX10.11 + PYTHON3.5.2

服务器:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import os
import sys
import multiprocessing
import asyncio
import socket

tcp_listen_port = 44330


class Listener:
    def __init__(self, protocol, listen_port, listen_host='localhost'):
        self._protocol = protocol
        self._listen_port = listen_port
        self._listen_host = listen_host
        self._loop = None
        self._server = None
        self._pid = os.getpid()

    def run(self):
        asyncio.set_event_loop(asyncio.new_event_loop())
        self._loop = asyncio.get_event_loop()

        coro = self._loop.create_server(
            self._protocol,
            host=self._listen_host,
            port=self._listen_port,
            family=socket.AF_INET,
            reuse_port=True
        )

        self._server = self._loop.run_until_complete(coro)

        print('Listener Server on {}, pid {}'.format(
            self._server.sockets[0].getsockname(),
            self._pid
            ))
        self._loop.run_forever()

    def close(self):
        self._server.close()
        self._loop.run_until_complete(self._server.wait_closed())
        self._loop.close()


class ProtocolEcho(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        data = 'hello python asyncio from pid {}

'.format(os.getpid()).encode()
        self.transport.write(data)
        self.transport.close()


def create_tcp_srv(listen_port):
    listener = Listener(ProtocolEcho, listen_port)
    try:
        listener.run()
    except KeyboardInterrupt:
        listener.close()


def main():
    cpu_count = multiprocessing.cpu_count()

    srvproclist = list()
    for i in range(cpu_count):
        p = multiprocessing.Process(
            target=create_tcp_srv,
            args=(tcp_listen_port,)
            )
        srvproclist.append(p)

    for proc in srvproclist:
        proc.start()

    for proc in srvproclist:
        proc.join()


if __name__ == '__main__':
    main()

客户

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import os
import asyncio
import multiprocessing

async def req():
    connect = asyncio.open_connection('localhost', 44330)
    reader, writer = await connect

    writer.write('hello'.encode('utf-8'))
    await writer.drain()

    while True:
        line = await reader.readline()

        if line == b'
':
            break

        print('proc {} recv {}'.format(os.getpid(), line.decode()))

    writer.close()


def begin_test():
    asyncio.set_event_loop(asyncio.new_event_loop())
    loop = asyncio.get_event_loop()
    tasks = [req() for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()


plist = list()
for i in range(4):
    p = multiprocessing.Process(
        target=begin_test
        )
    plist.append(p)

for proc in plist:
    proc.start()

for proc in plist:
    proc.join()

客户端输出如下:

proc 72319 recv hello python asyncio from pid 72310
proc 72318 recv hello python asyncio from pid 72310
proc 72319 recv hello python asyncio from pid 72310
proc 72320 recv hello python asyncio from pid 72310
proc 72321 recv hello python asyncio from pid 72310
proc 72319 recv hello python asyncio from pid 72310
proc 72318 recv hello python asyncio from pid 72310
proc 72320 recv hello python asyncio from pid 72310
proc 72321 recv hello python asyncio from pid 72310
proc 72318 recv hello python asyncio from pid 72310
proc 72320 recv hello python asyncio from pid 72310
proc 72321 recv hello python asyncio from pid 72310

最后一个数字'72310'是响应我的请求的服务器进程pid,所以我认为只有一个进程正在运行。但为什么...

答案

要让任何程序一次完成多项任务,需要花费大量的工作。您正在使用一个永无止境的进程创建一个服务器,并且没有使用回调指导程序(即connection_made(),data_received()等)。但是,这是一个简单的解决方案,使用协同程序。

简单地说,看起来你没有任何事件可以调用额外的回调。

在您的脚本中实现协同程序将是轻而易举的,因为您看起来为每个进程返回相同的字符串。你要做的就是在coroutine装饰器(@ asyncio.coroutine)中包装每个函数(进程),然后根据你认为合适的方式设置coroutine程序。

检查this了。这篇关于asyncio的文章很棒。 Asyncio现在正在进行大量宣传,因为它正在编写程序,可以同时执行多项操作,而且更容易。

'yield from'的实施尤其令人敬畏。

注意:我会把它放在评论中,但我还没有街头信誉。一天....

以上是关于如何通过python3 asyncio reuse_port编写正确的多进程服务器程序?的主要内容,如果未能解决你的问题,请参考以下文章

Python3 asyncio:wait_for()通信()超时,如何获得部分结果?

Python3 asyncio 简介

Python3 asyncio异步编程介绍

asyncio--python3未来并发编程主流充满野心的模块

python协程(4):asyncio

如何使用 python 的 asyncio 模块正确创建和运行并发任务?