使用 Socket.io 和 Redis 的节点集群问题

Posted

技术标签:

【中文标题】使用 Socket.io 和 Redis 的节点集群问题【英文标题】:Node Cluster issue using Socket.io and Redis 【发布时间】:2016-05-15 22:26:33 【问题描述】:

好的,我有一个快速供电的 API,我还运行了 socket.io 来接收/发送实时事件……一切都很好。我需要集群我的应用程序。我根据以下代码设置了所有内容。我启动了工人,他们得到了连接,一切正常,除了现在我不能“爆炸”到所有 socket.io 连接的事实。这是设置(取自this):

var express = require('express'),
    cluster = require('cluster'),
    net = require('net'),
    sio = require('socket.io'),
    sio_redis = require('socket.io-redis');

var port = 3000,
    num_processes = require('os').cpus().length;

if (cluster.isMaster) 
    // This stores our workers. We need to keep them to be able to reference
    // them based on source IP address. It's also useful for auto-restart,
    // for example.
    var workers = [];

    // Helper function for spawning worker at index 'i'.
    var spawn = function(i) 
        workers[i] = cluster.fork();

        // Optional: Restart worker on exit
        workers[i].on('exit', function(worker, code, signal) 
            console.log('respawning worker', i);
            spawn(i);
        );
    ;

    // Spawn workers.
    for (var i = 0; i < num_processes; i++) 
        spawn(i);
    

    // Helper function for getting a worker index based on IP address.
    // This is a hot path so it should be really fast. The way it works
    // is by converting the IP address to a number by removing the dots,
    // then compressing it to the number of slots we have.
    //
    // Compared against "real" hashing (from the sticky-session code) and
    // "real" IP number conversion, this function is on par in terms of
    // worker index distribution only much faster.
    var workerIndex = function (ip, len) 
    var _ip = ip.split(/['.'|':']/),
        arr = [];

    for (el in _ip) 
        if (_ip[el] == '') 
            arr.push(0);
        
        else 
            arr.push(parseInt(_ip[el], 16));
        
    

    return Number(arr.join('')) % len;


    // Create the outside facing server listening on our port.
    var server = net.createServer( pauseOnConnect: true , function(connection) 
        // We received a connection and need to pass it to the appropriate
        // worker. Get the worker for this connection's source IP and pass
        // it the connection.
        var worker = workers[worker_index(connection.remoteAddress, num_processes)];
        worker.send('sticky-session:connection', connection);
    ).listen(port);
 else 
    // Note we don't use a port here because the master listens on it for us.
    var app = new express();

    // Here you might use middleware, attach routes, etc.

    // Don't expose our internal server to the outside.
    var server = app.listen(0, 'localhost'),
        io = sio(server);

    // Tell Socket.IO to use the redis adapter. By default, the redis
    // server is assumed to be on localhost:6379. You don't have to
    // specify them explicitly unless you want to change them.
    io.adapter(sio_redis( host: 'localhost', port: 6379 ));

    // Here you might use Socket.IO middleware for authorization etc.

    // Listen to messages sent from the master. Ignore everything else.
    process.on('message', function(message, connection) 
        if (message !== 'sticky-session:connection') 
            return;
        

        // Emulate a connection event on the server by emitting the
        // event with the connection the master sent us.
        server.emit('connection', connection);

        connection.resume();
    );

所以我从各种机器连接以测试并发性,工作人员做他们的事情并且一切都很好,但是当我获得 IO 连接时,我正在记录“已连接”总数,并且每个实例始终为 1。我需要一种表达方式

allClusterForks.emit(stuff)

我在正确的工作进程 pid 上获得了连接,但“ALL CONNECTIONS”总是返回 1。

io.on('connection', function(socket) 
    console.log('Connected to worker %s', process.pid);
    console.log("Adapter ROOMS %s ", io.sockets.adapter.rooms);
    console.log("Adapter SIDS %s ", io.sockets.adapter.sids);
    console.log("SOCKETS CONNECTED %s ", Object.keys(io.sockets.connected).length);
);

我可以使用 Redis MONITOR 看到订阅/取消订阅

1454701383.188231 [0 127.0.0.1:63150] "subscribe" "socket.io#/#gXJscUUuVQGzsYJfAAAA#"
1454701419.130100 [0 127.0.0.1:63167] "subscribe" "socket.io#/#geYSvYSd5zASi7egAAAA#"
1454701433.842727 [0 127.0.0.1:63167] "unsubscribe" "socket.io#/#geYSvYSd5zASi7egAAAA#"
1454701444.630427 [0 127.0.0.1:63150] "unsubscribe" "socket.io#/#gXJscUUuVQGzsYJfAAAA#"

这些是来自 2 台不同机器的连接,我希望通过使用 socket io redis 适配器,这些订阅将来自同一个 redis 连接,但它们是不同的。

我只是完全错过了什么吗?令人惊讶的是,没有完全过时/错误/模棱两可的文档/文章。

编辑: 节点 v5.3.0 Redis v3.0.6 Socket.io v1.3.7

【问题讨论】:

【参考方案1】:

因此,如果有人遇到这种情况,我发现实际上“查看”跨进程连接的套接字的数量并不是一件事,而是广播或发送给它们。所以我基本上只是无缘无故地“测试”。一切都按预期工作。我将重写 socket.io-redis 适配器以允许跨进程检查计数。

几年前有一个拉取请求来实现对我正在尝试做的事情的支持。 https://github.com/socketio/socket.io-redis/pull/15 我可能会尝试清理并重新提交。

【讨论】:

以上是关于使用 Socket.io 和 Redis 的节点集群问题的主要内容,如果未能解决你的问题,请参考以下文章

在 PHP 中使用 Node.js、Socket.io、Redis 的私人聊天消息

将 pm2 的集群模块与 socket.io 和 socket.io-redis 一起使用

使用 Redis 和 Socket.io 进行用户认证

使用 Redis 扩展 Socket.IO,然后扩展 Redis 本身

没有使用这个基本的 socket.io + redis 设置接收事件?

如何在前端获取 socket.on 函数?事件被触发和处理。我正在使用 socket.io、NodeJS 服务器和 Redis.io