如何在 python asyncio 中处理 tcp 客户端套接字自动重新连接?

Posted

技术标签:

【中文标题】如何在 python asyncio 中处理 tcp 客户端套接字自动重新连接?【英文标题】:how to handle tcp client socket auto reconnect in python asyncio? 【发布时间】:2017-05-26 07:59:41 【问题描述】:

我正在使用 python asyncio 流连接到多个套接字服务器,但是当服务器关闭时,我的代码无法自动重新连接。

我需要的是,当服务器关闭时,我的脚本将尝试每 5 秒重新连接一次,直到连接并再次开始解析数据。

import asyncio

server1 = 'host': '192.168.1.51', 'port': 11110
server2 = 'host': '192.168.1.52', 'port': 11110


async def tcp_client(host, port, loop):
    print('connect to server  '.format(host, str(port)))
    reader, writer = await asyncio.open_connection(host, port, loop=loop)

    while True:
        data = await reader.read(100)
        print('raw data received: '.format(data))

        await asyncio.sleep(0.1)


loop = asyncio.get_event_loop()
try:
    for server in [server1, server2]:
        loop.run_until_complete(tcp_client(server['host'], server['port'], loop))
        print('task added: connect to server  '.format(server['host'], server['port']))
finally:
    loop.close()
    print('loop closed')

【问题讨论】:

首先您的代码将无法运行:您无法使用点符号访问字典中的项目 - 使用 server['host']server.get('host') 另一个问题是loop.run_until_complete() 一直运行到它完成,即它阻塞。因此,一次只能连接一台服务器。 还有一个问题是tcp_client()需要检查连接是否关闭。你可以知道当data 是空字符串时 - 然后从函数中返回。 1.我已经更正了代码。 2.所以我需要使用loop.call_soon()而不是loop.run_until_complete()来运行任务? 3.我可以使用len(data) == 0来检查连接是否关闭? 2.考虑使用asyncio.gather 或asyncio.wait 3. 见StreamReader.at_eof 【参考方案1】:

您可以通过简单地循环 try/except 语句来处理重新连接。

此外,asyncio.wait_for 可用于设置读取操作的超时时间。

考虑这个工作示例:

import asyncio

async def tcp_client(host, port):
    reader, writer = await asyncio.open_connection(host, port)
    try:
        while not reader.at_eof():
            data = await asyncio.wait_for(reader.read(100), 3.0)
            print('raw data received: '.format(data))
    finally:
        writer.close()

async def tcp_reconnect(host, port):
    server = ' '.format(host, port)
    while True:
        print('Connecting to server  ...'.format(server))
        try:
            await tcp_client(host, port)
        except ConnectionRefusedError:
            print('Connection to server  failed!'.format(server))
        except asyncio.TimeoutError:
            print('Connection to server  timed out!'.format(server))
        else:
            print('Connection to server  is closed.'.format(server))
        await asyncio.sleep(2.0)

async def main():
    servers = [('localhost', 8888), ('localhost', 9999)]
    coros = [tcp_reconnect(host, port) for host, port in servers]
    await asyncio.gather(*coros)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

【讨论】:

我使用了你的代码但失败了。当我拔下我的服务器并再次插入它们时,连接没有再次重新启动。以下是日志: 2017-05-26 21:39:54,239 - 连接到服务器 10.0.1.68 1100 ... 2017-05-26 21:39:54,240 - 连接到服务器 10.0.1.69 1100 ... 2017-05 -26 21:39:54,977 - 收到的原始数据:b'030 1495834968 00.000 00.004 00.010 00.007 00.004 00.000 00.000 00.000\n' @DesmondChen 当您拔下服务器时可能会引发另一个异常,您可以尝试将ConnectionRefusedError 替换为Exception。此外,使用await asyncio.gather(*coros) 将导致您的程序在第一个意外异常处停止。 我将“ConnectionRefusedError 更改为 Exception”和“await asyncio.wait(coros)”更改为“await asyncio.gather(*coros)”。但它仍然不起作用。我认为它卡在 async def tcp_client(host, port) 并且没有引发异常。 @DesmondChen 您可能希望使用 netcat 服务器运行测试(例如运行 nc -l 8888,向标准输入写入消息,按 Enter 并使用 CTRL+C 中断)以查看问题是否来自您的服务器。 好像buffer为空的时候read.at_eof()还是False。

以上是关于如何在 python asyncio 中处理 tcp 客户端套接字自动重新连接?的主要内容,如果未能解决你的问题,请参考以下文章

如何修复'错误:asyncio:任务已被破坏,但它正在等待处理! Python中的错误

如何在 Python 3.8 中为 asyncio.gather 构建任务列表

如何使用 python 的 asyncio 模块正确创建和运行并发任务?

Python asyncio:处理潜在的无限列表

python 在asyncio的事件循环上运行Tornado,包括请求处理程序中的'yield from'支持

如何在asyncio python中使用子进程模块限制并发进程数