计算水平服务器上的 socket.io 用户

Posted

技术标签:

【中文标题】计算水平服务器上的 socket.io 用户【英文标题】:Counting socket.io users across horizontal servers 【发布时间】:2012-08-23 06:53:21 【问题描述】:

我有多个使用 redisstore 水平扩展的 socket.io 服务器。我已经有效地设置了房间,并且能够成功地跨服务器广播到房间等。现在我正在尝试构建一个状态页面,而我未能弄清楚的是如何简单地计算连接的用户数量所有服务器。

io.sockets.clients('room') 和 io.sockets.sockets 只会告诉你那台服务器上连接的客户端数量,而不是连接到同一个 RedisStore 的所有服务器。

建议?

谢谢。

【问题讨论】:

为什么不直接查询每台服务器,然后将连接的客户端数量相加? 我也在寻找一种方法来回答这个问题,而不必为此设置某种观察者。不过,FWIW 看起来逻辑是每个服务器都知道连接到所有服务器的所有客户端 - 但也可能有与另一台服务器断开连接的陈旧客户端。看起来 socket.io 认为在其他服务器上修剪陈旧客户端的开销是不值得的,相反,一些服务器只会广播到一些空白处。 【参考方案1】:

这是我使用 Redis 脚本解决它的方法。它需要 2.6 或更高版本,因此目前很可能仍需要编译您自己的实例。

每次进程启动时,我都会生成一个新的 UUID 并将其保留在全局范围内。我可以使用 pid,但这感觉更安全。

# Pardon my coffeescript
processId = require('node-uuid').v4()

当用户连接时(socket.io 连接事件),然后我将该用户的 id 推送到基于该 processId 的用户列表中。我还将该密钥的到期时间设置为 30 秒。

RedisClient.lpush "process:#processId", user._id
RedisClient.expire "process:#processId", 30

当用户断开连接(断开连接事件)时,我将其删除并更新到期时间。

RedisClient.lrem "process:#processId", 1, user._id
RedisClient.expire "process:#processId", 30

我还设置了一个以 30 秒间隔运行的函数,以基本上“ping”该键,使其保持在那里。因此,如果进程确实意外终止,所有这些用户会话将基本上消失。

setInterval ->
  RedisClient.expire "process:#processId", 30
, 30 * 1000

现在是魔术。 Redis 2.6 包含 LUA 脚本,它本质上提供了一种存储过程的功能。它真的很快而且不是很占用处理器(他们将其与“几乎”运行的 C 代码进行比较)。

我的存储过程基本上循环遍历所有进程列表,并创建一个 user:user_id 键及其当前登录总数。这意味着,如果他们使用两个浏览器等登录,它仍然允许我使用逻辑来判断他们是完全断开连接,还是只是其中一个会话。

我在所有进程上每 15 秒运行一次此函数,并且在连接/断开连接事件之后也是如此。这意味着我的用户计数很可能会精确到秒,并且不会超过 15 到 30 秒。

生成该 redis 函数的代码如下所示:

def = require("promised-io/promise").Deferred

reconcileSha = ->
  reconcileFunction = "
    local keys_to_remove = redis.call('KEYS', 'user:*')
    for i=1, #keys_to_remove do
      redis.call('DEL', keys_to_remove[i])
    end

    local processes = redis.call('KEYS', 'process:*')
    for i=1, #processes do
      local users_in_process = redis.call('LRANGE', processes[i], 0, -1)
      for j=1, #users_in_process do
        redis.call('INCR', 'user:' .. users_in_process[j])
      end
    end
  "

  dfd = new def()
  RedisClient.script 'load', reconcileFunction, (err, res) ->
    dfd.resolve(res)
  dfd.promise

然后我可以稍后在我的脚本中使用它:

reconcileSha().then (sha) ->
  RedisClient.evalsha sha, 0, (err, res) ->
    # do stuff

我做的最后一件事是尝试处理一些关闭事件,以确保进程尝试最好不要依赖 redis 超时并真正优雅地关闭。

gracefulShutdown = (callback) ->
  console.log "shutdown"
  reconcileSha().then (sha) ->
    RedisClient.del("process:#processId")
    RedisClient.evalsha sha, 0, (err, res) ->
      callback() if callback?

# For ctrl-c
process.once 'SIGINT', ->
  gracefulShutdown ->
    process.kill(process.pid, 'SIGINT')

# For nodemon
process.once 'SIGUSR2', ->
  gracefulShutdown ->
    process.kill(process.pid, 'SIGUSR2')

到目前为止,它运行良好。

我仍然想做的一件事是让 redis 函数返回任何已更改其值的键。这样,如果特定用户的计数发生了变化,而没有任何服务器主动知道(比如进程死亡),我实际上可以发送一个事件。现在,我必须依靠再次轮询 user:* 值才能知道它已更改。它有效,但它可能会更好......

【讨论】:

这是一个有趣的实现。如果您连接了 10k+ 个客户端,您是否担心 30 秒 ping 的成本? 并非如此。它还没有被测试到那种程度。 Redis 正在成为我的应用程序中最重要的辅助组件,因此服务器将获得保持运行所需的资源。如果我能看到应用程序实例没有发生太多崩溃,我可能会采取另一种成本不高的方法。【参考方案2】:

我通过让每台服务器定期在 redis 中设置一个用户计数来解决这个问题,其中包含他们自己的 pid:

大家都做setex userCount:<pid> <interval+10> <count>

然后状态服务器可以查询每个键,然后获取每个键的值:

对于每个keys userCount* 做总计+=get <key>

所以如果服务器崩溃或关闭,那么它的计数将在间隔+10 后从 redis 中删除

对丑陋的伪代码感到抱歉。 :)

【讨论】:

您如何获得每个服务器用户的计数? io.sockets.clients().length 的结果并不总是正确的。例如: 1. 进程 A 正在运行并且有 2 个客户端连接。 io.sockets.clients().length 将正确返回 2。 2. 启动一个新进程 B,并将 2 个客户端连接到它。 B 将返回 2,但是 A 现在将返回 4,因为它已订阅 B 的连接事件。当您尝试重新启动服务器并重新连接客户端时,计数似乎变得更加不准确。 我正在使用 Object.keys(io.sockets.sockets).length,但它似乎在增长而不是准确地缩小,可能与您概述的原因相同。所以我不得不连接到我们的存在系统来获得准确的计数。为此,我们使用 socket.set 将我们的用户对象保存到 redis,然后使用活动或空闲更新该对象。因此,为了计数,我现在正在做的是从 io.sockets.sockets 循环套接字,如果用户的存在状态为“活动”,那么我将它们添加到计数中。【参考方案3】:

您可以使用哈希键来存储值。

当用户连接到服务器 1 时,您可以在名为“userCounts”的键上设置一个名为“srv1”的字段。只需使用HSET 覆盖当前计数的值即可。无需增加/减少。只需设置 socket.io 已知的当前值即可。

HSET userCounts srv1 "5"

当另一个用户连接到不同的服务器时,设置不同的字段。

HSET userCounts srv2 "10"

然后任何服务器都可以通过返回“userCounts”中的所有字段并使用HVALS将它们加在一起来返回值列表。

HVALS userCounts

当服务器崩溃时,您需要运行一个脚本来响应崩溃,从 userCounts 中删除该服务器的字段或将其 HSET 为“0”。

您可以查看Forever 自动重启服务器。

【讨论】:

我使用 upstart 来重新启动服务器,这比永远工作要好得多(我已经深入研究了很多)。我试图解释一个完整的服务器故障,这确实不时发生。我确实对此进行了监控(zabbix),但是让 zabbix 在服务器出现故障时通知仪表板对我来说似乎是一个相当大的技巧。 不幸的是,过期仅适用于键而不是单个 hset 字段。但也许您可以通过组合键和字段来解决问题。 我想我现在的计划是使用setex userCounts:<server-pid> <timeout> <count>,然后状态服务器可以调用keys userCounts*,然后获取并添加这些密钥。由于它们过期,如果服务器崩溃,它的计数将会下降。【参考方案4】:

当用户连接到聊天室时,您可以自动递增 RedisStore 中的用户计数器。当用户断开连接时,您会减小该值。通过这种方式,Redis 维护了用户数量,并且可供所有服务器访问。

见INCR和DECR

SET userCount = "0"

当用户连接时:

INCR userCount

当用户断开连接时:

DECR userCount

【讨论】:

除非服务器崩溃,否则这些计数变得毫无意义 您可以为每个服务器维护一个单独的计数并将它们相加。如果服务器出现故障,则将该服务器的计数器设置为 0。 这将需要一个单独的进程来跟踪服务器并为它们修复计数。我真的希望有一个纯粹的 socket.io 方法来做到这一点。

以上是关于计算水平服务器上的 socket.io 用户的主要内容,如果未能解决你的问题,请参考以下文章

node.js + socket.io + redis 架构 - 水平服务器缩放套接字连接?

使用 redis 水平扩展 socket.io

如何检测 socket.io 上的断开连接?

Google Glass 上的 Socket.io 客户端

如何跨多个服务器 nodejs 和 socket.io 存储 socket.id

Socket.io 中的身份验证