Python 3 线程化 websockets 服务器
Posted
技术标签:
【中文标题】Python 3 线程化 websockets 服务器【英文标题】:Python 3 Threaded websockets server 【发布时间】:2016-05-11 19:47:49 【问题描述】:我正在 python 3 中构建一个 Websocket 服务器应用程序。我正在使用这个实现:https://websockets.readthedocs.io/
基本上我想管理多个客户端。 我还想从 2 个不同的线程发送数据(一个用于 GPS + 一个用于 IMU) GPS线程刷新1Hz,IMU线程刷新25Hz
我的问题出在 MSGWorker.sendData 方法中:只要我取消注释行 @asyncio.coroutine 并从 websocket.send('"GPS": "%s"' % data) 产生,整个方法什么都不做(终端没有打印(“发送数据:foo”))
然而,这两行注释我的代码可以正常工作,只是我没有通过 websocket 发送任何内容。
但是,当然,我的目标是通过 websocket 发送数据,我只是不明白为什么它不起作用?有什么想法吗?
server.py
#!/usr/bin/env python3
import signal, sys
sys.path.append('.')
import time
import websockets
import asyncio
import threading
connected = set()
stopFlag = False
class GPSWorker (threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.data = 0
self.lastData = 0
self.inc = 0
# Simulate GPS data
def run(self):
while not stopFlag:
self.data = self.inc
self.inc += 1
time.sleep(1)
def get(self):
if self.lastData is not self.data:
self.lastData = self.data
return self.data
class IMUWorker (threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.data = 0
self.lastData = 0
self.inc = 0
# Simulate IMU data
def run(self):
while not stopFlag:
self.data = self.inc
self.inc += 1
time.sleep(0.04)
def get(self):
if self.lastData is not self.data:
self.lastData = self.data
return self.data
class MSGWorker (threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while not stopFlag:
data = gpsWorker.get()
if data:
self.sendData('"GPS": "%s"' % data)
data = imuWorker.get()
if data:
self.sendData('"IMU": "%s"' % data)
time.sleep(0.04)
#@asyncio.coroutine
def sendData(self, data):
for websocket in connected.copy():
print("Sending data: %s" % data)
#yield from websocket.send('"GPS": "%s"' % data)
@asyncio.coroutine
def handler(websocket, path):
global connected
connected.add(websocket)
#TODO: handle client disconnection
# i.e connected.remove(websocket)
if __name__ == "__main__":
print('aeroPi server')
gpsWorker = GPSWorker()
imuWorker = IMUWorker()
msgWorker = MSGWorker()
try:
gpsWorker.start()
imuWorker.start()
msgWorker.start()
start_server = websockets.serve(handler, 'localhost', 7700)
loop = asyncio.get_event_loop()
loop.run_until_complete(start_server)
loop.run_forever()
except KeyboardInterrupt:
stopFlag = True
loop.close()
print("Exiting program...")
client.html
<!doctype html>
<html>
<head>
<meta charset="UTF-8" />
</head>
<body>
</body>
</html>
<script type="text/javascript">
var ws = new WebSocket("ws://localhost:7700", 'json');
ws.onmessage = function (e)
var data = JSON.parse(e.data);
console.log(data);
;
</script>
感谢您的帮助
【问题讨论】:
【参考方案1】:我终于明白了! 它需要 Python 3.5.1(而我的发行版只提供 3.4.3)和来自 websockets 库的作者 Aymeric 的一些帮助(感谢他)。
#!/usr/bin/env python3
import signal, sys
sys.path.append('.')
import time
import websockets
import asyncio
import threading
stopFlag = False
class GPSWorker (threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.data = 0
self.lastData = 0
self.inc = 0
# Simulate GPS data
def run(self):
while not stopFlag:
self.data = self.inc
self.inc += 1
time.sleep(1)
def get(self):
if self.lastData is not self.data:
self.lastData = self.data
return self.data
class IMUWorker (threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.data = 0
self.lastData = 0
self.inc = 0
# Simulate IMU data
def run(self):
while not stopFlag:
self.data = self.inc
self.inc += 1
time.sleep(0.04)
def get(self):
if self.lastData is not self.data:
self.lastData = self.data
return self.data
class MSGWorker (threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.connected = set()
def run(self):
while not stopFlag:
data = gpsWorker.get()
if data:
self.sendData('"GPS": "%s"' % data)
data = imuWorker.get()
if data:
self.sendData('"IMU": "%s"' % data)
time.sleep(0.04)
async def handler(self, websocket, path):
self.connected.add(websocket)
try:
await websocket.recv()
except websockets.exceptions.ConnectionClosed:
pass
finally:
self.connected.remove(websocket)
def sendData(self, data):
for websocket in self.connected.copy():
print("Sending data: %s" % data)
coro = websocket.send(data)
future = asyncio.run_coroutine_threadsafe(coro, loop)
if __name__ == "__main__":
print('aeroPi server')
gpsWorker = GPSWorker()
imuWorker = IMUWorker()
msgWorker = MSGWorker()
try:
gpsWorker.start()
imuWorker.start()
msgWorker.start()
ws_server = websockets.serve(msgWorker.handler, '0.0.0.0', 7700)
loop = asyncio.get_event_loop()
loop.run_until_complete(ws_server)
loop.run_forever()
except KeyboardInterrupt:
stopFlag = True
#TODO: close ws server and loop correctely
print("Exiting program...")
问候, 克莱门特
【讨论】:
您已经有一段时间没有发布答案了。您是否开发了您的解决方案?我现在正在做类似的事情,想为自己省点麻烦:) 你是怎么关闭你的ws服务器和循环的?当KeyboardInterrupt
被提升时,我有一些线程挂起。
@DC_ACexcept KeyboardInterrupt: stopFlag = True ws_server.close() loop.stop() print("Exiting program...")以上是关于Python 3 线程化 websockets 服务器的主要内容,如果未能解决你的问题,请参考以下文章
使用 Python websocket-client 多线程
[Go] Golang练习项目-web客服系统即时通讯websocket项目go-fly
[WebSocket] 开发在线客服系统知识点-websocket返回状态码的含义