如何关闭 python asyncio 传输?

Posted

技术标签:

【中文标题】如何关闭 python asyncio 传输?【英文标题】:How to close python asyncio transport? 【发布时间】:2016-06-22 17:10:33 【问题描述】:

我正在开发一个使用 python 异步套接字服务器的项目。问题是服务器停止时服务器的实现不会在传输上调用 .close() 。这似乎使客户端保持连接并导致代码的其他部分崩溃。

Python 文档说传输需要明确关闭,但在这个项目中,我不知道在哪里可以关闭它们,因为没有引用为每个客户端创建的传输。 https://docs.python.org/3/library/asyncio-dev.html#close-transports-and-event-loops

代码如下:

"""
Socket server forwarding request to internal server
"""
import logging
try:
    # we prefer to use bundles asyncio version, otherwise fallback to trollius
    import asyncio
except ImportError:
    import trollius as asyncio


from opcua import ua
from opcua.server.uaprocessor import UaProcessor

logger = logging.getLogger(__name__)


class BinaryServer(object):

    def __init__(self, internal_server, hostname, port):
        self.logger = logging.getLogger(__name__)
        self.hostname = hostname
        self.port = port
        self.iserver = internal_server
        self.loop = internal_server.loop
        self._server = None
        self._policies = []

    def set_policies(self, policies):
        self._policies = policies

    def start(self):

        class OPCUAProtocol(asyncio.Protocol):

            """
            instanciated for every connection
            defined as internal class since it needs access
            to the internal server object
            FIXME: find another solution
            """

            iserver = self.iserver
            loop = self.loop
            logger = self.logger
            policies = self._policies

            def connection_made(self, transport):
                self.peername = transport.get_extra_info('peername')
                self.logger.info('New connection from %s', self.peername)
                self.transport = transport
                self.processor = UaProcessor(self.iserver, self.transport)
                self.processor.set_policies(self.policies)
                self.data = b""

            def connection_lost(self, ex):
                self.logger.info('Lost connection from %s, %s', self.peername, ex)
                self.transport.close()
                self.processor.close()

            def data_received(self, data):
                logger.debug("received %s bytes from socket", len(data))
                if self.data:
                    data = self.data + data
                    self.data = b""
                self._process_data(data)

            def _process_data(self, data):
                buf = ua.utils.Buffer(data)
                while True:
                    try:
                        backup_buf = buf.copy()
                        try:
                            hdr = ua.Header.from_string(buf)
                        except ua.utils.NotEnoughData:
                            logger.info("We did not receive enough data from client, waiting for more")
                            self.data = backup_buf.read(len(backup_buf))
                            return
                        if len(buf) < hdr.body_size:
                            logger.info("We did not receive enough data from client, waiting for more")
                            self.data = backup_buf.read(len(backup_buf))
                            return
                        ret = self.processor.process(hdr, buf)
                        if not ret:
                            logger.info("processor returned False, we close connection from %s", self.peername)
                            self.transport.close()
                            return
                        if len(buf) == 0:
                            return
                    except Exception:
                        logger.exception("Exception raised while parsing message from client, closing")
                        self.transport.close()
                        break

        coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
        self._server = self.loop.run_coro_and_wait(coro)
        print('Listening on :'.format(self.hostname, self.port))

    def stop(self):
        self.logger.info("Closing asyncio socket server")
        self.loop.call_soon(self._server.close)
        self.loop.run_coro_and_wait(self._server.wait_closed())

正如你所见,当我们在这个服务器类上调用 stop() 时,异步服务器调用它的 close 方法。但是,如果客户端已连接,则创建的传输永远不会关闭。

项目仓库在这里https://github.com/FreeOpcUa/python-opcua/,你可以看看Issue 137。

关闭传输对象的正确方法是什么?

【问题讨论】:

您可以在BinaryServer 属性中保留活动传输列表,例如由self.iserver.transports.append(transport)connection_made() 我试过了,它可以工作。但是,如何关闭 connection_lost() 中的正确传输?否则,随着时间的推移连接和断开连接的客户端可能会使我的self.iserver.transporst 列表变得非常大。 我想我发现我可以简单地在connection_lost() 中执行self.iserver.transports.remove(self.transport) 以在客户端断开连接时从列表中删除传输。如果这不起作用,我想我可以遍历列表并检查 transport.is_closing 并将其从列表中删除。感谢您的帮助,我将在测试后发布答案。 是的,self.iserver.transports.remove(self.transport) 应该可以正常工作。作为旁注,请记住,实际连接关闭将在transport.close() 调用之后的下一个事件循环迭代中执行。在实践中,它主要在测试中很重要:测试应该在传输关闭后推送类似 await asyncio.sleep(0) 的东西。 【参考方案1】:

我通过应用这种方法解决了这个问题:

#self.OPCUAServer - 这是我的 opcua 服务器

节点 = []

nodes.append(self.OPCUAServer.get_node("ns=0; s=Measurements")) #添加两个根节点 nodes.append(self.OPCUAServer.get_node("ns=1; s=Calibrations")) #到列表 self.OPCUAServer.delete_nodes(nodes, True) # 用这个列表递归调用delete_nodes self.OPCUAServer.stop()

【讨论】:

以上是关于如何关闭 python asyncio 传输?的主要内容,如果未能解决你的问题,请参考以下文章

异常事件循环在 python 3.8 中使用 aiohttp 和 asyncio 关闭

使用 asyncio 的 Python 网络

如何在 python asyncio 中处理 tcp 客户端套接字自动重新连接?

我想在 python 中使用 asyncio 在输入时打印一些东西[关闭]

我如何将crossbar客户端(python3,asyncio)与tkinter集成

Python Asyncio - RuntimeError:无法关闭正在运行的事件循环