Autobahn Asyncio ReconnectingClientFactory

Posted

技术标签:

【中文标题】Autobahn Asyncio ReconnectingClientFactory【英文标题】: 【发布时间】:2016-09-26 19:47:07 【问题描述】:

我想用 asyncio 创建一个ReconnectingClientFactory。特别是处理客户端启动时服务器不可用的情况,在这种情况下ReconnectingClientFactory 将继续尝试。这是asyncio.events.create_connection 不做的事情。

具体来说:

EchoClient 示例就可以了。 关键是如何建立联系。

factory = EchoClientFactory('ws://127.0.0.1:5678')
connectWS(factory)

twisted 版本的情况下,ReconnectingClientFactory

对比

factory = EchoClientFactory(u"ws://127.0.0.1:5678")
factory.protocol = SecureServerClientProtocol

loop = asyncio.get_event_loop()

# coro = loop.create_connection(factory, 'ws_server', 5678)
coro = loop.create_connection(factory, '127.0.0.1', 5678)

loop.run_until_complete(asyncio.wait([
    alive(), coro
]))
loop.run_forever()
loop.close()

或与asycnio 版本类似。

问题在于,在 asyncio 版本中,连接是由 asyncio.events.create_connection 建立的,如果服务器不可用,它就会失败。

如何调和两者?

非常感谢

【问题讨论】:

请澄清。你到底想达到什么目标,什么是足够的?您的问题目前至少涉及三个(可能不同)的问题。 嗨 Petri,我想用 asyncio 创建一个 ReconnectingClientFactory,它可以处理当客户端通过等待连接启动时服务器不可用的情况。这足够具体吗?干杯 【参考方案1】:

我想我得到了你想要的。这是基于asyncio TCP echo client protocol example的代码和示例。

import asyncio
import random


class ReconnectingTCPClientProtocol(asyncio.Protocol):
    max_delay = 3600
    initial_delay = 1.0
    factor = 2.7182818284590451
    jitter = 0.119626565582
    max_retries = None

    def __init__(self, *args, loop=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        self._loop = loop
        self._args = args
        self._kwargs = kwargs
        self._retries = 0
        self._delay = self.initial_delay
        self._continue_trying = True
        self._call_handle = None
        self._connector = None

    def connection_lost(self, exc):
        if self._continue_trying:
            self.retry()

    def connection_failed(self, exc):
        if self._continue_trying:
            self.retry()

    def retry(self):
        if not self._continue_trying:
            return

        self._retries += 1
        if self.max_retries is not None and (self._retries > self.max_retries):
            return

        self._delay = min(self._delay * self.factor, self.max_delay)
        if self.jitter:
            self._delay = random.normalvariate(self._delay,
                                               self._delay * self.jitter)
        self._call_handle = self._loop.call_later(self._delay, self.connect)

    def connect(self):
        if self._connector is None:
            self._connector = self._loop.create_task(self._connect())

    async def _connect(self):
        try:
            await self._loop.create_connection(lambda: self,
                                               *self._args, **self._kwargs)
        except Exception as exc:
            self._loop.call_soon(self.connection_failed, exc)
        finally:
            self._connector = None

    def stop_trying(self):
        if self._call_handle:
            self._call_handle.cancel()
            self._call_handle = None
        self._continue_trying = False
        if self._connector is not None:
            self._connector.cancel()
            self._connector = None


if __name__ == '__main__':
    class EchoClientProtocol(ReconnectingTCPClientProtocol):
        def __init__(self, message, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.message = message

        def connection_made(self, transport):
            transport.write(self.message.encode())
            print('Data sent: !r'.format(self.message))

        def data_received(self, data):
            print('Data received: !r'.format(data.decode()))

        def connection_lost(self, exc):
            print('The server closed the connection')
            print('Stop the event loop')
            self._loop.stop()


    loop = asyncio.get_event_loop()
    client = EchoClientProtocol('Hello, world!', '127.0.0.1', 8888, loop=loop)
    client.connect()
    loop.run_forever()
    loop.close()

【讨论】:

好像是twistedmatrix.com/documents/8.2.0/api/…的异步版本

以上是关于Autobahn Asyncio ReconnectingClientFactory的主要内容,如果未能解决你的问题,请参考以下文章

结合 2 个基于 asyncio 的代码段

autobahn.twisted.websocket 打开握手错误 400

Autobahn websockets Android 演示崩溃

如何为动态聊天室配置 Autobahn(crossbar.io)?

如何从协议外部发送 Autobahn/Twisted WAMP 消息?

如何将 HTTP 请求升级到 Websocket (Autobahn & Twisted Web)