Python 在线程类函数中广播更新的 Web-socket 数据

Posted

技术标签:

【中文标题】Python 在线程类函数中广播更新的 Web-socket 数据【英文标题】:Python broadcasting updated Web-socket data in a thread class function 【发布时间】:2020-10-02 03:28:52 【问题描述】:

我创建了一个简单的 python web-socket 服务器,如下所示

import asyncio
import json
import websockets
import threading
import queue
import time
import logging


logging.basicConfig()

STATE = "value": 0

USERS = set()

msg_queue = queue.Queue(256)
def state_event():
    return json.dumps("type": "state", **STATE)


def users_event():
    return json.dumps("type": "users", "count": len(USERS))


async def notify_state():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = state_event()
        await asyncio.wait([user.send(message) for user in USERS])


async def notify_users():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = users_event()
        await asyncio.wait([user.send(message) for user in USERS])


async def register(websocket):
    USERS.add(websocket)
    await notify_users()


async def unregister(websocket):
    USERS.remove(websocket)
    await notify_users()


async def counter(websocket, path):
    # register(websocket) sends user_event() to websocket
    await register(websocket)
    try:
        await websocket.send(state_event())
        async for message in websocket:
            data = json.loads(message)
            if data["action"] == "minus":
                STATE["value"] -= 1
                await notify_state()
            elif data["action"] == "plus":
                STATE["value"] += 1
                await notify_state()
            else:
                logging.error("unsupported event: ", data)
    finally:
        await unregister(websocket)

class clsSerialThread(threading.Thread):  
    global STATE    
    def __init__(self, threadID, name, job_queue):
        super().__init__()
        self.threadID = threadID
        self.name = name
        self.job_queue = job_queue

    def run(self):
        while(True):
            time.sleep(10)
            #broadcast an updated value every 10 seconds
            for ws in USERS: 
                asyncio.create_task(ws.send(json.dumps("value": STATE["value"]+10)))
                   
if __name__ == "__main__":
    try:
        serial_thread = clsSerialThread(1, 'Serial', msg_queue)
        serial_thread.setDaemon(True)
        serial_thread.start()
    except Exception as e1:
        print("error communicating to mainboard ...: " + str(e1))

    asyncio.get_event_loop().run_until_complete(websockets.serve(counter, "localhost", "8081"))
    asyncio.get_event_loop().run_forever()

使用 html 客户端测试我的服务器

<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket demo</title>
    </head>
    <body>
        <div class="buttons">
            <div class="minus button">-</div>
            <div class="value">?</div>
            <div class="plus button">+</div>
        </div>
        <div class="state">
            <span class="users">?</span> online
        </div>
        <script>
            var minus = document.querySelector('.minus'),
                plus = document.querySelector('.plus'),
                value = document.querySelector('.value'),
                users = document.querySelector('.users'),
                websocket = new WebSocket("ws://localhost:8081/");
            minus.onclick = function (event) 
                websocket.send(JSON.stringify(action: 'minus'));
            
            plus.onclick = function (event) 
                websocket.send(JSON.stringify(action: 'plus'));
            
            websocket.onmessage = function (event) 
                data = JSON.parse(event.data);
                switch (data.type) 
                    case 'state':
                        value.textContent = data.value;
                        break;
                    case 'users':
                        users.textContent = (
                            data.count.toString() + " user" +
                            (data.count == 1 ? "" : "s"));
                        break;
                    default:
                        console.error(
                            "unsupported event", data);
                
            ;
        </script>
    </body>
</html>

但是,run 函数不断抛出错误,例如“发生异常:RuntimeError 没有正在运行的事件循环”

如何在线程运行函数中更新 web-socket 数据?我在函数中添加了 async 关键字,它没有帮助

【问题讨论】:

【参考方案1】:

我发现需要将一个事件循环全局实例共享给线程类,如下:

import asyncio
import json
import websockets
import threading
import queue
import time
import logging


logging.basicConfig()

STATE = "value": 0

USERS = set()

msg_queue = queue.Queue(256)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

delay = 5
def state_event():
    global STATE
    STATE["value"] += delay
    return json.dumps("type": "state", **STATE)


def users_event():
    return json.dumps("type": "users", "count": len(USERS))


async def notify_state():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = state_event()
        await asyncio.wait([user.send(message) for user in USERS])


async def notify_users():
    if USERS:  # asyncio.wait doesn't accept an empty list
        message = users_event()
        await asyncio.wait([user.send(message) for user in USERS])


async def register(websocket):
    USERS.add(websocket)
    await notify_users()


async def unregister(websocket):
    USERS.remove(websocket)
    await notify_users()


async def counter(websocket, path):
    # register(websocket) sends user_event() to websocket
    await register(websocket)
    try:
        await websocket.send(state_event())
        async for message in websocket:
            data = json.loads(message)
            if data["action"] == "minus":
                STATE["value"] -= 1
                await notify_state()
            elif data["action"] == "plus":
                STATE["value"] += 1
                await notify_state()
            else:
                logging.error("unsupported event: ", data)
    finally:
        await unregister(websocket)

class clsSerialThread(threading.Thread):  
    global STATE
    global loop    
    def __init__(self, threadID, name, job_queue):
        super().__init__()
        self.threadID = threadID
        self.name = name
        self.job_queue = job_queue

    def run(self):
        while(True):
            time.sleep(delay)
            #broadcast an updated value every 'delay' seconds
            loop.create_task(notify_state())
                   
if __name__ == "__main__":
    try:
        serial_thread = clsSerialThread(1, 'Serial', msg_queue)
        serial_thread.setDaemon(True)
        serial_thread.start()
    except Exception as e1:
        print("error communicating to mainboard ...: " + str(e1))

    loop.run_until_complete(websockets.serve(counter, "localhost", "8081"))
    loop.run_forever()

现在似乎正在定期调用广播更新,但 websocket 客户端网页的更新频率似乎与延迟设置不匹配。有没有办法改善延迟?

【讨论】:

以上是关于Python 在线程类函数中广播更新的 Web-socket 数据的主要内容,如果未能解决你的问题,请参考以下文章

防止在 Pandas 中广播 [重复]

CUDA 中每个块的数据——它是不是在一个事务中广播?

在Flink中广播HashMap

Android中广播接收者BroadcastReceiver详解

让动作电缆在 Sidekiq 作业中广播

Android中广播的发送BroadcastReceiver