Python 在线程类函数中广播更新的 Web-socket 数据
Posted
技术标签:
【中文标题】Python 在线程类函数中广播更新的 Web-socket 数据【英文标题】:Python broadcasting updated Web-socket data in a thread class function 【发布时间】:2020-10-02 03:28:52 【问题描述】:我创建了一个简单的 python web-socket 服务器,如下所示
import asyncio
import json
import websockets
import threading
import queue
import time
import logging
logging.basicConfig()
STATE = "value": 0
USERS = set()
msg_queue = queue.Queue(256)
def state_event():
return json.dumps("type": "state", **STATE)
def users_event():
return json.dumps("type": "users", "count": len(USERS))
async def notify_state():
if USERS: # asyncio.wait doesn't accept an empty list
message = state_event()
await asyncio.wait([user.send(message) for user in USERS])
async def notify_users():
if USERS: # asyncio.wait doesn't accept an empty list
message = users_event()
await asyncio.wait([user.send(message) for user in USERS])
async def register(websocket):
USERS.add(websocket)
await notify_users()
async def unregister(websocket):
USERS.remove(websocket)
await notify_users()
async def counter(websocket, path):
# register(websocket) sends user_event() to websocket
await register(websocket)
try:
await websocket.send(state_event())
async for message in websocket:
data = json.loads(message)
if data["action"] == "minus":
STATE["value"] -= 1
await notify_state()
elif data["action"] == "plus":
STATE["value"] += 1
await notify_state()
else:
logging.error("unsupported event: ", data)
finally:
await unregister(websocket)
class clsSerialThread(threading.Thread):
global STATE
def __init__(self, threadID, name, job_queue):
super().__init__()
self.threadID = threadID
self.name = name
self.job_queue = job_queue
def run(self):
while(True):
time.sleep(10)
#broadcast an updated value every 10 seconds
for ws in USERS:
asyncio.create_task(ws.send(json.dumps("value": STATE["value"]+10)))
if __name__ == "__main__":
try:
serial_thread = clsSerialThread(1, 'Serial', msg_queue)
serial_thread.setDaemon(True)
serial_thread.start()
except Exception as e1:
print("error communicating to mainboard ...: " + str(e1))
asyncio.get_event_loop().run_until_complete(websockets.serve(counter, "localhost", "8081"))
asyncio.get_event_loop().run_forever()
使用 html 客户端测试我的服务器
<!DOCTYPE html>
<html>
<head>
<title>WebSocket demo</title>
</head>
<body>
<div class="buttons">
<div class="minus button">-</div>
<div class="value">?</div>
<div class="plus button">+</div>
</div>
<div class="state">
<span class="users">?</span> online
</div>
<script>
var minus = document.querySelector('.minus'),
plus = document.querySelector('.plus'),
value = document.querySelector('.value'),
users = document.querySelector('.users'),
websocket = new WebSocket("ws://localhost:8081/");
minus.onclick = function (event)
websocket.send(JSON.stringify(action: 'minus'));
plus.onclick = function (event)
websocket.send(JSON.stringify(action: 'plus'));
websocket.onmessage = function (event)
data = JSON.parse(event.data);
switch (data.type)
case 'state':
value.textContent = data.value;
break;
case 'users':
users.textContent = (
data.count.toString() + " user" +
(data.count == 1 ? "" : "s"));
break;
default:
console.error(
"unsupported event", data);
;
</script>
</body>
</html>
但是,run 函数不断抛出错误,例如“发生异常:RuntimeError 没有正在运行的事件循环”
如何在线程运行函数中更新 web-socket 数据?我在函数中添加了 async 关键字,它没有帮助
【问题讨论】:
【参考方案1】:我发现需要将一个事件循环全局实例共享给线程类,如下:
import asyncio
import json
import websockets
import threading
import queue
import time
import logging
logging.basicConfig()
STATE = "value": 0
USERS = set()
msg_queue = queue.Queue(256)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
delay = 5
def state_event():
global STATE
STATE["value"] += delay
return json.dumps("type": "state", **STATE)
def users_event():
return json.dumps("type": "users", "count": len(USERS))
async def notify_state():
if USERS: # asyncio.wait doesn't accept an empty list
message = state_event()
await asyncio.wait([user.send(message) for user in USERS])
async def notify_users():
if USERS: # asyncio.wait doesn't accept an empty list
message = users_event()
await asyncio.wait([user.send(message) for user in USERS])
async def register(websocket):
USERS.add(websocket)
await notify_users()
async def unregister(websocket):
USERS.remove(websocket)
await notify_users()
async def counter(websocket, path):
# register(websocket) sends user_event() to websocket
await register(websocket)
try:
await websocket.send(state_event())
async for message in websocket:
data = json.loads(message)
if data["action"] == "minus":
STATE["value"] -= 1
await notify_state()
elif data["action"] == "plus":
STATE["value"] += 1
await notify_state()
else:
logging.error("unsupported event: ", data)
finally:
await unregister(websocket)
class clsSerialThread(threading.Thread):
global STATE
global loop
def __init__(self, threadID, name, job_queue):
super().__init__()
self.threadID = threadID
self.name = name
self.job_queue = job_queue
def run(self):
while(True):
time.sleep(delay)
#broadcast an updated value every 'delay' seconds
loop.create_task(notify_state())
if __name__ == "__main__":
try:
serial_thread = clsSerialThread(1, 'Serial', msg_queue)
serial_thread.setDaemon(True)
serial_thread.start()
except Exception as e1:
print("error communicating to mainboard ...: " + str(e1))
loop.run_until_complete(websockets.serve(counter, "localhost", "8081"))
loop.run_forever()
现在似乎正在定期调用广播更新,但 websocket 客户端网页的更新频率似乎与延迟设置不匹配。有没有办法改善延迟?
【讨论】:
以上是关于Python 在线程类函数中广播更新的 Web-socket 数据的主要内容,如果未能解决你的问题,请参考以下文章