当python中的连接失败时如何重新连接到websocket
Posted
技术标签:
【中文标题】当python中的连接失败时如何重新连接到websocket【英文标题】:How to reconnect to a websocket when connection fails in python 【发布时间】:2019-12-14 13:28:18 【问题描述】:您好,我创建了异步 websocket 客户端,我可以异步接收和发送消息。
这是我使用的 clint 类。
import websockets
import asyncio
from models import Heartbeat
from model_helper import ModelHelper
from json_helper import JSONHelper
from os.path import dirname, abspath
import logging
import time
waiting_time = 0.5
class WebSocketClient():
def __init__(self, websocket_queue, configs):
self.Heartbeat = Heartbeat().message
self.websocket_queue = websocket_queue
self.configs = configs
def get_connection_string(self):
server_url = self.configs.URL + ":" + str(self.configs.Port) + self.configs.Root
return server_url
async def connect(self):
try:
server_url = self.get_connection_string()
self.connection = await websockets.client.connect(server_url)
if self.connection.open:
print("Connection stablished. Client correcly connected")
# Send greeting
await self.connection.send(self.Heartbeat)
return self.connection
else:
print("Can not connect")
except ConnectionRefusedError as err:
print("Connection Error: ".format(err))
async def send_message_to_socket(self, connection):
while True:
message = self.websocket_queue.get_send_queue()
try:
if message is not None:
message_ = ModelHelper.to_outgoing_request_model(message)
await connection.send(message_)
else:
await asyncio.sleep(waiting_time)
except websockets.exceptions.ConnectionClosed:
print('Connection with server closed')
self.connect()
async def receive_message_from_socket(self, connection):
while True:
try:
message = await connection.recv()
obj = JSONHelper.toObject(message)
print("Received object from websocket: ".format(obj))
#If a websocket entry has SendMessage in its Action property
#Consider it as an sms content to be sent.
if(obj.Action == "SendMessage"):
self.websocket_queue.add_received_queue(obj)
except websockets.exceptions.ConnectionClosed:
print('Connection with server closed1')
async def send_heartbeat_to_socket(self, connection):
while True:
#print("Heartbeat loop\n")
try:
await connection.send(self.Heartbeat)
await asyncio.sleep(3)
except websockets.exceptions.ConnectionClosed:
print('Connection with server closed2')
这是我监听 websocket 消息的代码:
def listen_websocket_routine(sms_queue, websocket_queue):
while True:
time.sleep(waiting_time)
#Check Websocket queue for new messages
message = websocket_queue.get_received_queue()
# If Incoming websocket JSON has SendMessage string in its Action attribute transform it to request_model
if message is not None and send_sms.Action == "SendMessage":
# Transform it to outgoing_sms_model
reset = ModelHelper.to_outgoing_sms_model(message.Data)
# Add To send_queue of sms
sms_queue.add_send_queue(reset)
最后我如何使用异步和线程启动它们。
client = WebSocketClient(websocket_queue, configs)
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(client.connect())
task1 = asyncio.ensure_future(client.receive_message_from_socket(connection))
task2 = asyncio.ensure_future(client.send_heartbeat_to_socket(connection))
task3 = asyncio.ensure_future(client.send_message_to_socket(connection))
listen_websocket_thread = threading.Thread(target=listen_websocket_routine, args=(sms_queue, websocket_queue))
listen_websocket_thread.start()
loop.run_forever()
所以我的问题是,每当连接中断时,我都需要重新建立连接。但我不确定我应该在哪里做。应该是在我每次尝试发送或接收消息之前,还是应该以更一般的方式进行?
【问题讨论】:
【参考方案1】:自从更新10.0 之后,您可以使用异步迭代器来完成,如下所示(来自官方doc 的示例):
async for websocket in websockets.connect(...):
try:
...
except websockets.ConnectionClosed:
continue
【讨论】:
以上是关于当python中的连接失败时如何重新连接到websocket的主要内容,如果未能解决你的问题,请参考以下文章
当连接变坏时,有啥方法可以让 JBoss 连接池重新连接到 Oracle?