如何在服务器端发出 SocketIO 事件
Posted
技术标签:
【中文标题】如何在服务器端发出 SocketIO 事件【英文标题】:How to emit SocketIO event on the serverside 【发布时间】:2013-03-10 06:19:00 【问题描述】:我正在运行一个 gevent-socketio Django 应用程序。
我有类似这个类的东西
@namespace('/connect')
class ConnectNamespace(BaseNamespace):
def on_send(self, data):
# ...
但是,如果我从 javascript 客户端收到事件,一切正常,例如 send
事件得到正确处理
如果我想emit
服务器端的某个事件,我会有点迷茫。我可以在课堂上使用socket.send_packet
但现在我想将一些事件链接到post_save
信号,所以我想从这个命名空间类外部send_packet
,这样做的一种方法是
ConnectNamespaceInstance.on_third_event('someeventname')
我只是不知道如何获取 ConnectNamespaceInstance 的实例
综上所述,我只想在收到post_save
信号后向javascript客户端发送一个事件
【问题讨论】:
***.com/questions/13504320/… 我遇到了同样的问题,但无法解决...有什么想法吗? 很抱歉,但我认为建议的重复项不能回答我的问题 您想与专用客户端通信还是广播事件? 一旦我有了类的实例,我想我可以同时做,我只需要使用BroadcastMixin
的方法将它发送给每个人
【参考方案1】:
您可能想要做的是添加一个模块变量来跟踪连接,比如_connections
,如下所示:
_connections =
@namespace('/connect')
class ConnectNamespace(BaseNamespace):
然后添加 initialize
和 disconnect
方法,这些方法使用一些您可以稍后引用的快乐标识符:
def initialize(self, *args, **kwargs):
_connections[id(self)] = self
super(ConnectNamespace, self).initialize(*args, **kwargs)
def disconnect(self, *args, **kwargs):
del _connections[id(self)]
super(ConnectNamespace, self).disconnect(*args, **kwargs)
当您需要生成事件时,您只需在_connections
变量中查找正确的连接,然后使用emit
触发事件。
(没有对此进行任何测试,但我在许多其他语言中使用过类似的模式:看不出有任何理由说明这在 Python 中也不起作用)。
【讨论】:
我想知道框架中是否有一些内置工具可以做到这一点,但这是我应该想到的一个非常好的解决方法! +1 :) 谢谢 任何使用此解决方案广播到频道的示例? 这帮助我在 Django 中的 post_save 信号上发送消息。非常好的解决方案。【参考方案2】:除了 Femi 的回答之外,我认为这肯定有效。使用 Redis 可能会给您更多的灵活性,并且使用 gevent 中的 greenlet 可能会使这种方法更加“在框架中”,因为您已经在使用 gevent-socketio :D
REDIS_HOST = getattr(settings, 'REDIS_HOST', '127.0.0.1')
class YourNamespace(BaseNamespace):
def _listener(self, channel_label_you_later_call_in_post_save):
pubsub = redis.StrictRedis(REDIS_HOST).pubsub()
pubsub.subscribe(chan)
while True:
for i in pubsub.listen():
self.send('message_data': i, json=True)
def recv_message(self, message):
if is_message_to_subscribe(message):
self.spawn(self.listener, get_your_channel_label(message))
在你的 post_save 中,你可以这样做
red = redis.StrictRedis(REDIS_HOST)
red.publish(channel_label, message_data)
【讨论】:
以上是关于如何在服务器端发出 SocketIO 事件的主要内容,如果未能解决你的问题,请参考以下文章
以设定的时间间隔从 flask_socketio 服务器发出事件
使用 pytest 测试 flask-socketio 服务器发出的事件时出错
向特定用户发出的 SocketIO(不为每个客户端使用单独的房间)