Python Tornado 从另一个线程发送 WebSocket 消息

Posted

技术标签:

【中文标题】Python Tornado 从另一个线程发送 WebSocket 消息【英文标题】:Python Tornado send WebSocket messages from another thread 【发布时间】:2020-07-23 10:39:35 【问题描述】:

我想在 Python 中使用 WebSockets 来让 Web 客户端及时了解我使用 PySerial 从串行端口读取的数据。我目前正在使用以下代码通过单独的线程连续读取串行数据

def read_from_port():
    while running:
        reading = ser.readline().decode()
        handle_data(reading)

thread = threading.Thread(target=read_from_port)
thread.daemon = True
thread.start()

我正在对串行数据进行一些处理,然后如果计算结果与其先前的值不同,我想向所有连接的 WebSocket 客户端广播一条消息。为此,我设置了以下代码

clients = []

def Broadcast(message):
    for client in clients:
        client.sendMessage(json.dumps(message).encode('utf8'))
        print("broadcasted")

worker.broadcast = Broadcast

class WSHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print('new connection')
        clients.append(self)
      
    def on_message(self, message):
        print('message received:  %s' % message)
        response = handler.HandleRequest(message, self.write_message)
 
    def on_close(self):
        print('connection closed')
        clients.remove(self)
 
    def check_origin(self, origin):
        return True
 
application = tornado.web.Application([
    (r'/ws', WSHandler),
])
 
if __name__ == "__main__":
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8765)
    myIP = socket.gethostbyname(socket.gethostname())
    print('*** Websocket Server Started at %s***' % myIP)
    tornado.ioloop.IOLoop.instance().start()

然后我想在工作人员中使用“广播”方法来广播结果。在工作线程中使用此方法会产生以下错误

File "main.py", line 18, in Broadcast
    client.write_message(message)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 342, in write_message
    return self.ws_connection.write_message(message, binary=binary)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1098, in write_message
    fut = self._write_frame(True, opcode, message, flags=flags)
  File "/usr/local/lib/python3.8/site-packages/tornado/websocket.py", line 1075, in _write_frame
    return self.stream.write(frame)
  File "/usr/local/lib/python3.8/site-packages/tornado/iostream.py", line 555, in write
    future = Future()  # type: Future[None]
  File "/usr/local/Cellar/python@3.8/3.8.3_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.

我了解问题是 Tornado write_message 函数不是线程安全的,并且由于我试图直接从工作线程调用该函数而产生此错误。据我所知,在 Tornado 中使用并发代码的推荐方法是通过 asyncio,但我认为线程方法在这种情况下可能更合适,因为我有一个基本上是不断并行运行的循环。

不幸的是,我对 asyncio 以及如何在 Python 中实现线程知之甚少,所以我想知道从不同线程发送 WebSocket 消息的最简单方法是什么。

【问题讨论】:

请注意,方法或函数名称的首字母大写是个坏主意,因为这样很难理解名称是类还是函数。惯例是对类使用大写的名称,对函数使用小写的名称。 Python 中的多线程很好。问题是这些可怕的网络服务器软件包。是的,上面的代码失败了——有时 iostream 是从调用线程编写的,有时是从“事件循环”编写的。所以断言在 iostream 中失败。整个事情是不必要的复杂。 【参考方案1】:

在https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading 阅读有关同时使用 asyncio 和多​​线程的官方文档给了我必要的线索,即使用“call_soon_threadsafe”函数可以非常优雅地实现这一点。因此,以下代码似乎可以解决问题

tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)

clients = []

def bcint(message):
    for client in clients:
        client.write_message(message)
        print("broadcasted")

def Broadcast(message):
    io_loop.asyncio_loop.call_soon_threadsafe(bcint, message)

worker.broadcast = Broadcast

class WSHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print('new connection')
        clients.append(self)
      
    def on_message(self, message):
        print('message received:  %s' % message)
        response = handler.HandleRequest(message, self.write_message)
 
    def on_close(self):
        print('connection closed')
        clients.remove(self)
 
    def check_origin(self, origin):
        return True
 
application = tornado.web.Application([
    (r'/ws', WSHandler),
])
 
if __name__ == "__main__":
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8765)
    myIP = socket.gethostbyname(socket.gethostname())
    print('*** Websocket Server Started at %s***' % myIP)
    tornado.ioloop.IOLoop.current().start()

【讨论】:

AttributeError: 'module' 对象没有属性 'asyncio_loop'。也许你的答案是假设 Python 3?问题没有说 Python 3。【参考方案2】:

一个更简洁的选择是使用队列,例如pyzmq,这将帮助您建立从一个线程到另一个线程的通信。

查看您的用例,您可以使用 PUB/SUB 模型。这是sample code。此外,您可以使用“inproc”而不是“tcp”。这将减少延迟,因为您将在同一进程中的多个线程之间进行通信。

【讨论】:

以上是关于Python Tornado 从另一个线程发送 WebSocket 消息的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法从另一个进程向线程发送信号?

从另一个线程向 QSerialPort 发送数据

从另一个线程向 uWebSockets 0.15.x 套接字发送数据

Python Tornado简介

Sockjs - 在 Python 代码中向 sockjs-tornado 发送消息

知乎为啥使用Tornado?使用Python中的多线程特性了吗