如何跨多个服务器/工作人员管理 websocket
Posted
技术标签:
【中文标题】如何跨多个服务器/工作人员管理 websocket【英文标题】:How to manage websockets across multiple servers / workers 【发布时间】:2016-06-19 15:33:16 【问题描述】:aiohttp 具有对websockets 的内置支持。它非常简单并且效果很好。
文档中示例的简化版本是:
async def handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# Async iterate the messages the client sends
async for message in ws:
ws.send_str('You sent: %s' % (message.data,))
print('websocket connection closed')
在示例中,ws
是对与客户端的 websocket 连接的引用。我可以轻松地将这些引用放入request.app
,例如@Crandel does here(即全局状态),但不能放入生产应用程序中,因为每个应用程序服务器(甚至每个工作人员)都会有自己的app
实例。
这有可接受的模式吗?还有其他方法吗?
注意:我指的不是会话。我指的是连接。当服务器 B 的应用程序代码中发生事件等时,我想向连接到服务器 A 的客户端发送消息。
【问题讨论】:
【参考方案1】:所以我只熟悉 Node 中的 Socket.IO,但使用 Socket.IO 水平扩展 websocket 相当容易。
套接字可以随会话一起提供,因此每个会话都由特定的服务器管理。这样可以轻松保存每个打开的套接字的状态,并在所有服务器之间实现负载平衡。
这里是 Python 的 SocketIO:
https://pypi.python.org/pypi/socketIO-client
这是一本非常好的读物,介绍了如何将会话附加到 redis 存储以使其更快并且更易于管理跨服务器的负载平衡。
How to share sessions with Socket.IO 1.x and Express 4.x?
我知道这并不能回答您关于 aiohttp 的问题,但希望这能让您更好地了解套接字是如何工作的。
编辑: 用Node写的-
在 Socket.IO 中这真的很简单,它有很多函数可以以各种不同的方式广播消息。
例如,如果您想向每个聊天室中的每个人发送消息。示例每个打开套接字的人都可以轻松编写。
socket.broadcast.emit('WARNING', "this is a test");
假设您有开放的房间,您可以使用名为.to()
的简单函数仅向该房间中的人广播消息。示例我有一个名为“BBQ”的房间:
socket.broadcast.to('BBQ').emit('invitation', 'Come get some food!');
这将在频道烧烤中向所有人发送消息 - 来吃点东西!
编辑:编辑:
这是一篇关于 Socket.IO 工作原理的精彩文章,请确保您阅读了函数更新版本的第二个答案。它比他们的文档更容易阅读。
Send response to all clients except sender (Socket.io)
据我所知,这在 python 实现中也是如此。为了便于使用,我当然会将它用于 websockets。 aiohttp 看起来确实很强大,但要么没有这个功能,要么隐藏在文档中,要么只写在没有任何文档的代码中。
【讨论】:
这不是我要问的。使用aiohttp-session.readthedocs.org/en/latest 可以轻松管理会话 - 我指的是与客户端的实际连接。换句话说,给定服务器 A 中发生的事件,我如何向已连接到服务器 B 的客户端发送消息。 您是否希望它专门用于连接到服务器 B 的人,即使您有服务器 c 和服务器 d,还是要向所有可能登录到您服务器的客户端发出事件? 想想类似聊天服务器的东西 - 如果连接到服务器 A 的用户发送消息,那么我需要将其发送给同一聊天室中的用户,即使其中一些用户连接到服务器B、C 等。这样有意义吗? 啊,好吧,我可以使用 Node SocketIO 实现来编辑我对这个问题的答案,但我不知道这是否会对你的 aiohttp 有所帮助。我从来没有在 Node.js 中使用过 aiohttp 或 websockets。我道歉。如果你想添加另一个库来做 websockets,使用 SocketIO 很容易。 啊,其实这对我很有帮助,因为我也知道 Node。【参考方案2】:更新(2017 年 2 月)
Channels (幸运的是)没有合并到 Django 中。它可能仍然是一个伟大的项目,但它并不真正属于 Django。
另外,我强烈建议您看一下 Postgres 相对较新的对 pub/sub 的内置支持。它将probably outperform anything else,并在 aiohttp 上构建自定义解决方案,使用 Postgres 作为支持服务,可能是您最好的选择。
原创
虽然不是aiohttp,Django Channels,很可能会合并到Django 1.10中,很直观的解决了这个问题,作者Andrew Godwin,Django的作者migrations。
Django Channels 通过在 Django 应用程序前面创建路由层来抽象“许多服务器上的许多进程”的概念。该路由层与后端(例如 Redis)对话以在进程之间保持可共享状态,并使用新的ASGI 协议来促进处理 HTTP 请求和 WebSocket,同时将每个请求委托给它们各自的“consumers”(例如, 附带一个内置的 HTTP 请求处理程序,您可以为 WebSockets 编写自己的处理程序。
Django Channels 有一个名为Groups 的概念,它处理问题的“广播”性质;也就是说,它允许服务器上发生的事件向该组中的客户端触发消息,无论它们连接到相同还是不同的进程或服务器。
恕我直言,Django Channels 很可能被抽象为更通用的 Python 库。有一个couple other Python libraries 可以实现Go-like Channels,但是在撰写本文时,没有什么值得注意的可以提供网络透明度; Channels 在进程和服务器之间进行通信的能力。
【讨论】:
【参考方案3】:如果我的理解正确,您希望拥有多个 websocket 服务器,每个服务器连接多个客户端,但您希望能够与所有连接的客户端进行潜在通信。
这是一个创建三个普通服务器的示例——一个大写回显、一个随机报价和一天中的时间——然后向所有连接的客户端发送一条广播消息。也许这里面有一些有用的想法。
Pastebin:https://pastebin.com/xDSACmdV
#!/usr/bin/env python3
"""
Illustrates how to have multiple websocket servers running and send
messages to all their various clients at once.
In response to *** question:
https://***.com/questions/35820782/how-to-manage-websockets-across-multiple-servers-workers
Pastebin: https://pastebin.com/xDSACmdV
"""
import asyncio
import datetime
import random
import time
import webbrowser
import aiohttp
from aiohttp import web
__author__ = "Robert Harder"
__email__ = "rob@iharder.net"
__license__ = "Public Domain"
def main():
# Create servers
cap_srv = CapitalizeEchoServer(port=9990)
rnd_srv = RandomQuoteServer(port=9991)
tim_srv = TimeOfDayServer(port=9992)
# Queue their start operation
loop = asyncio.get_event_loop()
loop.create_task(cap_srv.start())
loop.create_task(rnd_srv.start())
loop.create_task(tim_srv.start())
# Open web pages to test them
webtests = [9990, 9991, 9991, 9992, 9992]
for port in webtests:
url = "http://www.websocket.org/echo.html?location=ws://localhost:".format(port)
webbrowser.open(url)
print("Be sure to click 'Connect' on the webpages that just opened.")
# Queue a simulated broadcast-to-all message
def _alert_all(msg):
print("Sending alert:", msg)
msg_dict = "alert": msg
cap_srv.broadcast_message(msg_dict)
rnd_srv.broadcast_message(msg_dict)
tim_srv.broadcast_message(msg_dict)
loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")
# Run event loop
loop.run_forever()
class MyServer:
def __init__(self, port):
self.port = port # type: int
self.loop = None # type: asyncio.AbstractEventLoop
self.app = None # type: web.Application
self.srv = None # type: asyncio.base_events.Server
async def start(self):
self.loop = asyncio.get_event_loop()
self.app = web.Application()
self.app["websockets"] = [] # type: [web.WebSocketResponse]
self.app.router.add_get("/", self._websocket_handler)
await self.app.startup()
handler = self.app.make_handler()
self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
print(" listening on port ".format(self.__class__.__name__, self.port))
async def close(self):
assert self.loop is asyncio.get_event_loop()
self.srv.close()
await self.srv.wait_closed()
for ws in self.app["websockets"]: # type: web.WebSocketResponse
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')
await self.app.shutdown()
await self.app.cleanup()
async def _websocket_handler(self, request):
assert self.loop is asyncio.get_event_loop()
ws = web.WebSocketResponse()
await ws.prepare(request)
self.app["websockets"].append(ws)
await self.do_websocket(ws)
self.app["websockets"].remove(ws)
return ws
async def do_websocket(self, ws: web.WebSocketResponse):
async for ws_msg in ws: # type: aiohttp.WSMessage
pass
def broadcast_message(self, msg: dict):
for ws in self.app["websockets"]: # type: web.WebSocketResponse
ws.send_json(msg)
class CapitalizeEchoServer(MyServer):
""" Echoes back to client whatever they sent, but capitalized. """
async def do_websocket(self, ws: web.WebSocketResponse):
async for ws_msg in ws: # type: aiohttp.WSMessage
cap = ws_msg.data.upper()
ws.send_str(cap)
class RandomQuoteServer(MyServer):
""" Sends a random quote to the client every so many seconds. """
QUOTES = ["Wherever you go, there you are.",
"80% of all statistics are made up.",
"If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]
def __init__(self, interval: float = 10, *kargs, **kwargs):
super().__init__(*kargs, **kwargs)
self.interval = interval
async def do_websocket(self, ws: web.WebSocketResponse):
async def _regular_interval():
while self.srv.sockets is not None:
quote = random.choice(RandomQuoteServer.QUOTES)
ws.send_json("quote": quote)
await asyncio.sleep(self.interval)
self.loop.create_task(_regular_interval())
await super().do_websocket(ws) # leave client connected here indefinitely
class TimeOfDayServer(MyServer):
""" Sends a message to all clients simultaneously about time of day. """
async def start(self):
await super().start()
async def _regular_interval():
while self.srv.sockets is not None:
if int(time.time()) % 10 == 0: # Only on the 10 second mark
timestamp = ":%Y-%m-%d %H:%M:%S".format(datetime.datetime.now())
self.broadcast_message("timestamp": timestamp)
await asyncio.sleep(1)
self.loop.create_task(_regular_interval())
if __name__ == "__main__":
main()
【讨论】:
嗨@rharder,我希望我的 100 位客户将他们的 base64 图像发送到在 jetson-nano 板上运行的单个服务器。所以我想异步发生。所以请您指教上面的代码sn-p是否适合我的项目? @SaddamBinSyed 是的,我想你可以用这个。以上是关于如何跨多个服务器/工作人员管理 websocket的主要内容,如果未能解决你的问题,请参考以下文章