如何在 python asyncio 中处理 tcp 客户端套接字自动重新连接?
Posted
技术标签:
【中文标题】如何在 python asyncio 中处理 tcp 客户端套接字自动重新连接?【英文标题】:how to handle tcp client socket auto reconnect in python asyncio? 【发布时间】:2017-05-26 07:59:41 【问题描述】:我正在使用 python asyncio 流连接到多个套接字服务器,但是当服务器关闭时,我的代码无法自动重新连接。
我需要的是,当服务器关闭时,我的脚本将尝试每 5 秒重新连接一次,直到连接并再次开始解析数据。
import asyncio
server1 = 'host': '192.168.1.51', 'port': 11110
server2 = 'host': '192.168.1.52', 'port': 11110
async def tcp_client(host, port, loop):
print('connect to server '.format(host, str(port)))
reader, writer = await asyncio.open_connection(host, port, loop=loop)
while True:
data = await reader.read(100)
print('raw data received: '.format(data))
await asyncio.sleep(0.1)
loop = asyncio.get_event_loop()
try:
for server in [server1, server2]:
loop.run_until_complete(tcp_client(server['host'], server['port'], loop))
print('task added: connect to server '.format(server['host'], server['port']))
finally:
loop.close()
print('loop closed')
【问题讨论】:
首先您的代码将无法运行:您无法使用点符号访问字典中的项目 - 使用server['host']
或 server.get('host')
。
另一个问题是loop.run_until_complete()
一直运行到它完成,即它阻塞。因此,一次只能连接一台服务器。
还有一个问题是tcp_client()
需要检查连接是否关闭。你可以知道当data
是空字符串时 - 然后从函数中返回。
1.我已经更正了代码。 2.所以我需要使用loop.call_soon()而不是loop.run_until_complete()来运行任务? 3.我可以使用len(data) == 0来检查连接是否关闭?
2.考虑使用asyncio.gather 或asyncio.wait 3. 见StreamReader.at_eof
【参考方案1】:
您可以通过简单地循环 try/except
语句来处理重新连接。
此外,asyncio.wait_for 可用于设置读取操作的超时时间。
考虑这个工作示例:
import asyncio
async def tcp_client(host, port):
reader, writer = await asyncio.open_connection(host, port)
try:
while not reader.at_eof():
data = await asyncio.wait_for(reader.read(100), 3.0)
print('raw data received: '.format(data))
finally:
writer.close()
async def tcp_reconnect(host, port):
server = ' '.format(host, port)
while True:
print('Connecting to server ...'.format(server))
try:
await tcp_client(host, port)
except ConnectionRefusedError:
print('Connection to server failed!'.format(server))
except asyncio.TimeoutError:
print('Connection to server timed out!'.format(server))
else:
print('Connection to server is closed.'.format(server))
await asyncio.sleep(2.0)
async def main():
servers = [('localhost', 8888), ('localhost', 9999)]
coros = [tcp_reconnect(host, port) for host, port in servers]
await asyncio.gather(*coros)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
【讨论】:
我使用了你的代码但失败了。当我拔下我的服务器并再次插入它们时,连接没有再次重新启动。以下是日志: 2017-05-26 21:39:54,239 - 连接到服务器 10.0.1.68 1100 ... 2017-05-26 21:39:54,240 - 连接到服务器 10.0.1.69 1100 ... 2017-05 -26 21:39:54,977 - 收到的原始数据:b'030 1495834968 00.000 00.004 00.010 00.007 00.004 00.000 00.000 00.000\n' @DesmondChen 当您拔下服务器时可能会引发另一个异常,您可以尝试将ConnectionRefusedError
替换为Exception
。此外,使用await asyncio.gather(*coros)
将导致您的程序在第一个意外异常处停止。
我将“ConnectionRefusedError 更改为 Exception”和“await asyncio.wait(coros)”更改为“await asyncio.gather(*coros)”。但它仍然不起作用。我认为它卡在 async def tcp_client(host, port) 并且没有引发异常。
@DesmondChen 您可能希望使用 netcat 服务器运行测试(例如运行 nc -l 8888
,向标准输入写入消息,按 Enter 并使用 CTRL+C 中断)以查看问题是否来自您的服务器。
好像buffer为空的时候read.at_eof()还是False。以上是关于如何在 python asyncio 中处理 tcp 客户端套接字自动重新连接?的主要内容,如果未能解决你的问题,请参考以下文章
如何修复'错误:asyncio:任务已被破坏,但它正在等待处理! Python中的错误
如何在 Python 3.8 中为 asyncio.gather 构建任务列表
如何使用 python 的 asyncio 模块正确创建和运行并发任务?