恢复通过 websocket 发送的数据

Posted

技术标签:

【中文标题】恢复通过 websocket 发送的数据【英文标题】:Recover data sent via websocket 【发布时间】:2020-05-21 21:14:09 【问题描述】:

我使用 Websocket 检索数据以进行进一步处理。 我不知道如何在课堂之外检索它。 我使用线程模块将 websocket 与程序的其余部分分开,这样我就可以运行一个 pyqt5 应用程序,在其中显示处理后的数据,但我无法检索它。

也许我应该使用线程以外的东西,但我不知道。 因为我可以接收大量数据并且有很多工作要做,计算、显示等。我尝试将其优化到最低限度,否则它将永远无法每秒处理我的所有请求。

import websockets
import asyncio
import json
import threading
import time


class WS(object):
    def __init__(self, serveur):
        self.serveur = serveur

    async def connect(self):

        async with websockets.connect(self.serveur) as websocket:
            while True:
                message = await websocket.recv()
                self.data = json.loads(message)
                print(self.data)


uri = "wss://www.bitmex.com/realtime?subscribe=instrument:XBTUSD"
ws = WS(uri)

loop = asyncio.get_event_loop()
th1 = threading.Thread(target=lambda: loop.run_until_complete(ws.connect()))
th1.start()

while True:  # My application that will display and process the data retrieved by the websocket.
    print('blabla')
    time.sleep(3)

【问题讨论】:

【参考方案1】:

默认情况下,Qt 不支持事件循环,因为在这些情况下使用线程通常是一种有效的解决方法,但在这些情况下,最好使用诸如 qasync(python -m pip install qasync) 和 asyncqt(python -m pip install asyncqt 之类的库)。考虑到这种情况,一个可能的解决方案是使用 Qt 信号来发送信息。

import asyncio
import json
import websockets

from PyQt5 import QtCore, QtWidgets
from asyncqt import QEventLoop


class WS(QtCore.QObject):
    dataChanged = QtCore.pyqtSignal(dict)

    def __init__(self, server, parent=None):
        super().__init__(parent)
        self._server = server

    @property
    def server(self):
        return self._server

    async def connect(self):

        async with websockets.connect(self.server) as websocket:
            while True:
                message = await websocket.recv()
                data = json.loads(message)
                self.dataChanged.emit(data)


class MainWindow(QtWidgets.QMainWindow):
    def __init__(self, parent=None):
        super().__init__(parent)

        self.text_edit = QtWidgets.QTextEdit()
        self.setCentralWidget(self.text_edit)

    @QtCore.pyqtSlot(dict)
    def update_data(self, data):
        # only for test
        text = json.dumps(data)
        self.text_edit.setPlainText(text)


def main():
    import sys

    app = QtWidgets.QApplication(sys.argv)
    loop = QEventLoop(app)
    asyncio.set_event_loop(loop)

    uri = "wss://www.bitmex.com/realtime?subscribe=instrument:XBTUSD"
    ws = WS(uri)

    w = MainWindow()
    ws.dataChanged.connect(w.update_data)
    w.show()

    with loop:
        loop.create_task(ws.connect())
        loop.run_forever()


if __name__ == "__main__":
    main()

更新:

import asyncio
import json
import websockets

from PyQt5 import QtCore, QtWidgets
from asyncqt import QEventLoop, asyncSlot


class WS(QtCore.QObject):
    dataChanged = QtCore.pyqtSignal(dict)

    def __init__(self, server, parent=None):
        super().__init__(parent)
        self._server = server
        self._websocket = None

    @property
    def websocket(self):
        return self._websocket

    async def connect(self):
        self._websocket = await websockets.connect(self._server)
        await self.ready_read()

    async def ready_read(self):
        while True:
            message = await self.websocket.recv()
            data = json.loads(message)
            self.dataChanged.emit(data)

    @asyncSlot(dict)
    async def send_message(self, message):
        data = json.dumps(message)
        await self.websocket.send(data)


class MainWindow(QtWidgets.QMainWindow):
    sendMessageSignal = QtCore.pyqtSignal(dict)

    def __init__(self, parent=None):
        super().__init__(parent)

        self.button = QtWidgets.QPushButton("Press me")
        self.text_edit = QtWidgets.QTextEdit()

        central_widget = QtWidgets.QWidget()
        lay = QtWidgets.QVBoxLayout(central_widget)
        lay.addWidget(self.button)
        lay.addWidget(self.text_edit)
        self.setCentralWidget(central_widget)

        self.button.clicked.connect(self.on_clicked)

    @QtCore.pyqtSlot()
    def on_clicked(self):
        auth_data = "op": "subscribe", "args": ["instrument:XBTUSD"]
        self.sendMessageSignal.emit(auth_data)

    @QtCore.pyqtSlot(dict)
    def update_data(self, data):
        # only for test
        text = json.dumps(data)
        self.text_edit.setPlainText(text)


def main():
    import sys

    app = QtWidgets.QApplication(sys.argv)
    loop = QEventLoop(app)
    asyncio.set_event_loop(loop)

    uri = "wss://www.bitmex.com/realtime?subscribe=instrument:XBTUSD"
    ws = WS(uri)

    w = MainWindow()
    ws.dataChanged.connect(w.update_data)
    w.sendMessageSignal.connect(ws.send_message)
    w.show()

    with loop:
        loop.create_task(ws.connect())
        loop.run_forever()


if __name__ == "__main__":
    main()

【讨论】:

@antho 你想要的不是微不足道的,我建议你首先了解 Asyncio 和 PyQt 的工作原理,当你熟悉这两种技术时,然后尝试使用这两种技术,如我更新的示例所示。如果您添加更多要求,那么我将不再回复,在这种情况下,您将不得不创建一个新帖子来说明您尝试过的内容 @antho 我没有链接,但我的建议已经在我之前的评论中指出了。

以上是关于恢复通过 websocket 发送的数据的主要内容,如果未能解决你的问题,请参考以下文章

无法通过 tomcat 中的 websocket 发送二进制消息,但可以在 glassfish 中使用。使用 IllegalArgumentException 在 tomcat 中失败

WebSockets + PHP(棘轮)登录系统

WebSocket 实战

websocket._exceptions.WebSocketProxyException:通过代理连接失败状态:503

WebSocket服务端和客户端代码示例

WebSocket服务端和客户端代码示例