带有 Gui 的 Websocket
Posted
技术标签:
【中文标题】带有 Gui 的 Websocket【英文标题】:Websocket with Gui 【发布时间】:2020-06-17 14:54:35 【问题描述】:我的第一个类允许我连接到 websocket,第二个类允许我尝试用图表来显示数据。
在代码上,我只是尝试在 weboscket 使用 pyqtgraph 运行时显示一个图表,但窗口完全是错误的。
谢谢
import asyncio
import json
import websockets
from asyncqt import asyncSlot, QtCore
import pyqtgraph as pg
class Ws(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(dict)
def __init__(self, parent=None):
super().__init__(parent)
self._websocket = None
@property
def websocket(self):
return self._websocket
async def connect(self, server,):
self._websocket = await websockets.connect(server)
await self.on_message()
@asyncSlot(dict)
async def on_message(self):
while True:
message = await self.websocket.recv()
message = json.loads(message)
self.dataChanged.emit(message)
@asyncSlot(dict)
async def send_message(self, message):
while self.websocket is None:
await asyncio.sleep(0.2)
data = json.dumps(message)
await self.websocket.send(data)
def run(self):
loop = asyncio.get_event_loop()
loop.create_task(self.connect(server="wss://www.bitmex.com/realtime"))
loop.run_forever()
class TestUi(object):
def __init__(self):
self.plt = pg.plot()
self.ws = Ws()
self.ws.send_message(
"op": "subscribe", "args": ["instrument:XBTUSD"])
self.ws.dataChanged.connect(self.update_data)
self.ws.run()
def update_data(self, data):
print(data)
if __name__ == "__main__":
u = TestUi()
【问题讨论】:
【参考方案1】:你要设置asyncqt的eventloop:
import asyncio
import json
import sys
import websockets
from PyQt5 import QtCore, QtWidgets
import pyqtgraph as pg
from asyncqt import QEventLoop, asyncSlot
class Ws(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(dict)
def __init__(self, parent=None):
super().__init__(parent)
self._websocket = None
@property
def websocket(self):
return self._websocket
async def connect(
self, server,
):
self._websocket = await websockets.connect(server)
await self.on_message()
@asyncSlot(dict)
async def on_message(self):
while True:
message = await self.websocket.recv()
message = json.loads(message)
self.dataChanged.emit(message)
@asyncSlot(dict)
async def send_message(self, message):
while self.websocket is None:
await asyncio.sleep(0.2)
data = json.dumps(message)
await self.websocket.send(data)
def run(self):
loop = asyncio.get_event_loop()
loop.create_task(self.connect(server="wss://www.bitmex.com/realtime"))
loop.run_forever()
class TestUi(object):
def __init__(self):
app = QtWidgets.QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
self.plt = pg.plot()
self.ws = Ws()
self.ws.send_message("op": "subscribe", "args": ["instrument:XBTUSD"])
self.ws.dataChanged.connect(self.update_data)
self.ws.run()
def update_data(self, data):
print(data)
if __name__ == "__main__":
u = TestUi()
【讨论】:
以上是关于带有 Gui 的 Websocket的主要内容,如果未能解决你的问题,请参考以下文章