计算水平服务器上的 socket.io 用户
Posted
技术标签:
【中文标题】计算水平服务器上的 socket.io 用户【英文标题】:Counting socket.io users across horizontal servers 【发布时间】:2012-08-23 06:53:21 【问题描述】:我有多个使用 redisstore 水平扩展的 socket.io 服务器。我已经有效地设置了房间,并且能够成功地跨服务器广播到房间等。现在我正在尝试构建一个状态页面,而我未能弄清楚的是如何简单地计算连接的用户数量所有服务器。
io.sockets.clients('room') 和 io.sockets.sockets 只会告诉你那台服务器上连接的客户端数量,而不是连接到同一个 RedisStore 的所有服务器。
建议?
谢谢。
【问题讨论】:
为什么不直接查询每台服务器,然后将连接的客户端数量相加? 我也在寻找一种方法来回答这个问题,而不必为此设置某种观察者。不过,FWIW 看起来逻辑是每个服务器都知道连接到所有服务器的所有客户端 - 但也可能有与另一台服务器断开连接的陈旧客户端。看起来 socket.io 认为在其他服务器上修剪陈旧客户端的开销是不值得的,相反,一些服务器只会广播到一些空白处。 【参考方案1】:这是我使用 Redis 脚本解决它的方法。它需要 2.6 或更高版本,因此目前很可能仍需要编译您自己的实例。
每次进程启动时,我都会生成一个新的 UUID 并将其保留在全局范围内。我可以使用 pid,但这感觉更安全。
# Pardon my coffeescript
processId = require('node-uuid').v4()
当用户连接时(socket.io 连接事件),然后我将该用户的 id 推送到基于该 processId 的用户列表中。我还将该密钥的到期时间设置为 30 秒。
RedisClient.lpush "process:#processId", user._id
RedisClient.expire "process:#processId", 30
当用户断开连接(断开连接事件)时,我将其删除并更新到期时间。
RedisClient.lrem "process:#processId", 1, user._id
RedisClient.expire "process:#processId", 30
我还设置了一个以 30 秒间隔运行的函数,以基本上“ping”该键,使其保持在那里。因此,如果进程确实意外终止,所有这些用户会话将基本上消失。
setInterval ->
RedisClient.expire "process:#processId", 30
, 30 * 1000
现在是魔术。 Redis 2.6 包含 LUA 脚本,它本质上提供了一种存储过程的功能。它真的很快而且不是很占用处理器(他们将其与“几乎”运行的 C 代码进行比较)。
我的存储过程基本上循环遍历所有进程列表,并创建一个 user:user_id 键及其当前登录总数。这意味着,如果他们使用两个浏览器等登录,它仍然允许我使用逻辑来判断他们是完全断开连接,还是只是其中一个会话。
我在所有进程上每 15 秒运行一次此函数,并且在连接/断开连接事件之后也是如此。这意味着我的用户计数很可能会精确到秒,并且不会超过 15 到 30 秒。
生成该 redis 函数的代码如下所示:
def = require("promised-io/promise").Deferred
reconcileSha = ->
reconcileFunction = "
local keys_to_remove = redis.call('KEYS', 'user:*')
for i=1, #keys_to_remove do
redis.call('DEL', keys_to_remove[i])
end
local processes = redis.call('KEYS', 'process:*')
for i=1, #processes do
local users_in_process = redis.call('LRANGE', processes[i], 0, -1)
for j=1, #users_in_process do
redis.call('INCR', 'user:' .. users_in_process[j])
end
end
"
dfd = new def()
RedisClient.script 'load', reconcileFunction, (err, res) ->
dfd.resolve(res)
dfd.promise
然后我可以稍后在我的脚本中使用它:
reconcileSha().then (sha) ->
RedisClient.evalsha sha, 0, (err, res) ->
# do stuff
我做的最后一件事是尝试处理一些关闭事件,以确保进程尝试最好不要依赖 redis 超时并真正优雅地关闭。
gracefulShutdown = (callback) ->
console.log "shutdown"
reconcileSha().then (sha) ->
RedisClient.del("process:#processId")
RedisClient.evalsha sha, 0, (err, res) ->
callback() if callback?
# For ctrl-c
process.once 'SIGINT', ->
gracefulShutdown ->
process.kill(process.pid, 'SIGINT')
# For nodemon
process.once 'SIGUSR2', ->
gracefulShutdown ->
process.kill(process.pid, 'SIGUSR2')
到目前为止,它运行良好。
我仍然想做的一件事是让 redis 函数返回任何已更改其值的键。这样,如果特定用户的计数发生了变化,而没有任何服务器主动知道(比如进程死亡),我实际上可以发送一个事件。现在,我必须依靠再次轮询 user:* 值才能知道它已更改。它有效,但它可能会更好......
【讨论】:
这是一个有趣的实现。如果您连接了 10k+ 个客户端,您是否担心 30 秒 ping 的成本? 并非如此。它还没有被测试到那种程度。 Redis 正在成为我的应用程序中最重要的辅助组件,因此服务器将获得保持运行所需的资源。如果我能看到应用程序实例没有发生太多崩溃,我可能会采取另一种成本不高的方法。【参考方案2】:我通过让每台服务器定期在 redis 中设置一个用户计数来解决这个问题,其中包含他们自己的 pid:
大家都做setex userCount:<pid> <interval+10> <count>
然后状态服务器可以查询每个键,然后获取每个键的值:
对于每个keys userCount*
做总计+=get <key>
所以如果服务器崩溃或关闭,那么它的计数将在间隔+10 后从 redis 中删除
对丑陋的伪代码感到抱歉。 :)
【讨论】:
您如何获得每个服务器用户的计数? io.sockets.clients().length 的结果并不总是正确的。例如: 1. 进程 A 正在运行并且有 2 个客户端连接。 io.sockets.clients().length 将正确返回 2。 2. 启动一个新进程 B,并将 2 个客户端连接到它。 B 将返回 2,但是 A 现在将返回 4,因为它已订阅 B 的连接事件。当您尝试重新启动服务器并重新连接客户端时,计数似乎变得更加不准确。 我正在使用 Object.keys(io.sockets.sockets).length,但它似乎在增长而不是准确地缩小,可能与您概述的原因相同。所以我不得不连接到我们的存在系统来获得准确的计数。为此,我们使用 socket.set 将我们的用户对象保存到 redis,然后使用活动或空闲更新该对象。因此,为了计数,我现在正在做的是从 io.sockets.sockets 循环套接字,如果用户的存在状态为“活动”,那么我将它们添加到计数中。【参考方案3】:您可以使用哈希键来存储值。
当用户连接到服务器 1 时,您可以在名为“userCounts”的键上设置一个名为“srv1”的字段。只需使用HSET 覆盖当前计数的值即可。无需增加/减少。只需设置 socket.io 已知的当前值即可。
HSET userCounts srv1 "5"
当另一个用户连接到不同的服务器时,设置不同的字段。
HSET userCounts srv2 "10"
然后任何服务器都可以通过返回“userCounts”中的所有字段并使用HVALS将它们加在一起来返回值列表。
HVALS userCounts
当服务器崩溃时,您需要运行一个脚本来响应崩溃,从 userCounts 中删除该服务器的字段或将其 HSET 为“0”。
您可以查看Forever 自动重启服务器。
【讨论】:
我使用 upstart 来重新启动服务器,这比永远工作要好得多(我已经深入研究了很多)。我试图解释一个完整的服务器故障,这确实不时发生。我确实对此进行了监控(zabbix),但是让 zabbix 在服务器出现故障时通知仪表板对我来说似乎是一个相当大的技巧。 不幸的是,过期仅适用于键而不是单个 hset 字段。但也许您可以通过组合键和字段来解决问题。 我想我现在的计划是使用setex userCounts:<server-pid> <timeout> <count>
,然后状态服务器可以调用keys userCounts*
,然后获取并添加这些密钥。由于它们过期,如果服务器崩溃,它的计数将会下降。【参考方案4】:
当用户连接到聊天室时,您可以自动递增 RedisStore 中的用户计数器。当用户断开连接时,您会减小该值。通过这种方式,Redis 维护了用户数量,并且可供所有服务器访问。
见INCR和DECR
SET userCount = "0"
当用户连接时:
INCR userCount
当用户断开连接时:
DECR userCount
【讨论】:
除非服务器崩溃,否则这些计数变得毫无意义 您可以为每个服务器维护一个单独的计数并将它们相加。如果服务器出现故障,则将该服务器的计数器设置为 0。 这将需要一个单独的进程来跟踪服务器并为它们修复计数。我真的希望有一个纯粹的 socket.io 方法来做到这一点。以上是关于计算水平服务器上的 socket.io 用户的主要内容,如果未能解决你的问题,请参考以下文章
node.js + socket.io + redis 架构 - 水平服务器缩放套接字连接?