当python中的连接失败时如何重新连接到websocket

Posted

技术标签:

【中文标题】当python中的连接失败时如何重新连接到websocket【英文标题】:How to reconnect to a websocket when connection fails in python 【发布时间】:2019-12-14 13:28:18 【问题描述】:

您好,我创建了异步 websocket 客户端,我可以异步接收和发送消息。

这是我使用的 clint 类。

import websockets
import asyncio
from models import Heartbeat
from model_helper import ModelHelper
from json_helper import JSONHelper
from os.path import dirname, abspath
import logging
import time
waiting_time = 0.5

class WebSocketClient():

    def __init__(self, websocket_queue, configs):
        self.Heartbeat = Heartbeat().message
        self.websocket_queue = websocket_queue
        self.configs = configs

    def get_connection_string(self):
        server_url = self.configs.URL + ":" + str(self.configs.Port) + self.configs.Root
        return server_url


    async def connect(self):

        try:
            server_url = self.get_connection_string()
            self.connection = await websockets.client.connect(server_url)

            if self.connection.open:
                print("Connection stablished. Client correcly connected")
                # Send greeting
                await self.connection.send(self.Heartbeat)
                return self.connection

            else:
                print("Can not connect")

        except ConnectionRefusedError as err:
            print("Connection Error: ".format(err))


    async def send_message_to_socket(self, connection):
        while True:
            message = self.websocket_queue.get_send_queue()
            try:
                if message is not None:
                    message_ = ModelHelper.to_outgoing_request_model(message)
                    await connection.send(message_)
                else:
                    await asyncio.sleep(waiting_time)
            except websockets.exceptions.ConnectionClosed:
                print('Connection with server closed')
                self.connect()


    async def receive_message_from_socket(self, connection):
        while True:
            try:
                message = await connection.recv()
                obj = JSONHelper.toObject(message)
                print("Received object from websocket: ".format(obj))


                #If a websocket entry  has SendMessage in its Action property
                #Consider it as an sms content to be sent.
                if(obj.Action == "SendMessage"):
                    self.websocket_queue.add_received_queue(obj)

            except websockets.exceptions.ConnectionClosed:
                print('Connection with server closed1')


    async def send_heartbeat_to_socket(self, connection):
        while True:            
            #print("Heartbeat loop\n")
            try:
                await connection.send(self.Heartbeat)
                await asyncio.sleep(3)
            except websockets.exceptions.ConnectionClosed:
                print('Connection with server closed2')

这是我监听 websocket 消息的代码:

def listen_websocket_routine(sms_queue, websocket_queue):
    while True:
        time.sleep(waiting_time)
        #Check Websocket queue for new messages
        message = websocket_queue.get_received_queue()

        # If Incoming websocket JSON has SendMessage string in its Action attribute transform it to request_model
        if message is not None and send_sms.Action == "SendMessage":
            # Transform it to  outgoing_sms_model
            reset = ModelHelper.to_outgoing_sms_model(message.Data)
            # Add To send_queue of sms
            sms_queue.add_send_queue(reset)

最后我如何使用异步和线程启动它们。

client = WebSocketClient(websocket_queue, configs)

loop = asyncio.get_event_loop()
connection = loop.run_until_complete(client.connect())

task1 = asyncio.ensure_future(client.receive_message_from_socket(connection))
task2 = asyncio.ensure_future(client.send_heartbeat_to_socket(connection))
task3 = asyncio.ensure_future(client.send_message_to_socket(connection))


listen_websocket_thread = threading.Thread(target=listen_websocket_routine, args=(sms_queue, websocket_queue))

listen_websocket_thread.start()

loop.run_forever()

所以我的问题是,每当连接中断时,我都需要重新建立连接。但我不确定我应该在哪里做。应该是在我每次尝试发送或接收消息之前,还是应该以更一般的方式进行?

【问题讨论】:

【参考方案1】:

自从更新10.0 之后,您可以使用异步迭代器来完成,如下所示(来自官方doc 的示例):

async for websocket in websockets.connect(...):
    try:
        ...
    except websockets.ConnectionClosed:
        continue

【讨论】:

以上是关于当python中的连接失败时如何重新连接到websocket的主要内容,如果未能解决你的问题,请参考以下文章

当连接变坏时,有啥方法可以让 JBoss 连接池重新连接到 Oracle?

用FlashFXP时总是连接失败(连接丢失)原因是啥

连接到 *** 时如何使用 ngrok 进行隧道传输?

自动重新连接到 TCP 主机

一旦连接失败,Spring消费者没有连接到rabbitmq队列

如何使用ODBC数据连接连接到本地SQL Server? (我得到“登录失败”)