在 redis 订阅事件发生后,如何加入套接字 io 房间?

Posted

技术标签:

【中文标题】在 redis 订阅事件发生后,如何加入套接字 io 房间?【英文标题】:How do I join a socket io room AFTER a redis subscription event happens? 【发布时间】:2021-12-09 22:29:06 【问题描述】:

我正在使用 redis pub/sub,我想在 redis sub 事件之后加入一个房间:

const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require("redis");

const expressServer = app.listen(3001, () => console.log("Express is running!"));

const io = socketio(expressServer);

const sub = redis.createClient(port: 6379, host: '127.0.0.1');

sub.subscribe('message');


let room;

sub.on('error', function (error) 
    console.log('ERROR ' + error);
);

sub.on('connect', function()
    console.log('Redis client connected');
);

//Redis sub scribe to message conversation channel
sub.on('message', async function (channel, data) 
    data = JSON.parse(data);
    if(channel === 'conversation')
        room = data.id;
        console.log('room set ' + room)
    
);

io.on('connect', (socket) => 
    socket.emit('test', 'from server')

    //Room I'd like to join
    socket.join(room);

    io.of("/").adapter.on("join-room", (room, id) => 
        console.log(`socket $id has joined room $room`);
    );
);

基本上我想要在sub.on('message') 上设置room 状态,然后在我想要socket.join(room); 之后。现在发生的事情是套接字服务器的连接速度比 redis 子事件快。在第一个子事件中,socket 服务器加入了一个未定义的房间,然后它加入了sub.on(message) 设置的最后一个房间

有没有办法可以使用节点事件发射器,以便在 redis 设置房间后加入套接字房间?或者我将如何使用 socket.io.reids.adapter 来做到这一点?

It looks like there's a way to do this with new redis socket.io adapter 函数,但没有关于如何做的文档?函数:RedisAdapter.remoteJoin(id, room)

编辑

我开始尝试使用带有套接字适配器的remoteJoin 执行此操作。问题是socket.id

const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require("redis");
const redisAdapter = require('@socket.io/redis-adapter');

const expressServer = app.listen(3001, () => console.log("Express is running!"));

const io = socketio(expressServer);

const sub = redis.createClient(port: 6379, host: '127.0.0.1');
const pub = sub.duplicate();

io.adapter(redisAdapter(pub, sub));

sub.subscribe('conversation','message', function()
    // console.log('Subbed')
)

//sub.subscribe('conversation','message');

let room;
let soketId;

sub.on('error', function (error) 
    console.log('ERROR ' + error);
);

sub.on('connect', function()
    console.log('Redis client connected');
);


sub.on('message', async function (channel, data) 
     data = JSON.parse(data);
     if(channel === 'conversation')
        room = data.id;

        io.of('/').adapter.remoteJoin(socketId, room);

        console.log('subbed ' + room)
    
);

io.on('connect', (socket) => 
    socketId = socket.id;

    io.of("/").adapter.on("join-room", (room, id) => 
        console.log(`socket $id has joined room $room`);
    );
);

问题是socketId = socket.id; 设置为最后一个连接的客户端。所以所有客户端都成为最后一个连接的客户端。

【问题讨论】:

【参考方案1】:

我已重构您的代码以响应来自 Redis pub/sub 的传入消息

可以使用新的 Redis 客户端或具有基于 Promise 的 API 的 ioredis 客户端来减少此代码。

每当有新客户端连接时,您都需要在 Redis 上订阅消息,以便在客户端连接时对新消息做出反应。

问题是当套接字断开时,您不再需要接收这些消息,所以我创建了一个辅助函数,它执行以下操作:

连接到redis 创建订阅 当有新消息到达时,它会调用回调函数 在调用时返回一个函数取消订阅消息并断开当前 Redis 客户端。

const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require('redis');

const expressServer = app.listen(3001, () => console.log('Express is running!'));

const io = socketio(expressServer);

const createSubscriber = async cb => 
    const client = redis.createClient( port: 6379, host: '127.0.0.1' );

    await new Promise((resolve, reject) => 
        client.on('connect', resolve);
        client.on('error', reason => 
            client.end();
            reject(reason);
        );
    );

    await new Promise((resolve, reject) => 
        client.subscribe('message', err => 
            if (err) 
                client.end();
                reject(err);
             else 
                resolve();
            
        );
    );

    client.on('message', (channel, message) => 
        if (channel === 'conversation') 
            cb(JSON.parse(message));
        
    );

    return async () => 
        await new Promise(resolve => 
            client.unsubscribe('message', err => 
                if (err) 
                    console.error(err);
                
                client.end();
                resolve();
            );
        );
    ;
;

io.on('connect', async socket => 
    socket.emit('test', 'from server');

    try 
        const onDisconnected = await createSubscriber(data => 
            socket.join(data.id);
        );

        socket.on('disconnect', async () => 
            await onDisconnected();
        );
     catch (error) 
        console.error(error); // redis has failed to connect
        socket.emit('error', error.message);
        socket.disconnect(); // disconnect the socket since there is nothing to do
    
);

io.of('/').adapter.on('join-room', (room, id) => 
    console.log(`socket $id has joined room $room`);
);



【讨论】:

感谢您的帮助,但现在的问题是所有客户都加入了同一个房间。我之前尝试将我的子方法放入连接中,结果相同。这就是为什么我选择使用:io.of('/').adapter.remoteJoin(socketId, room); 在我的编辑中,但我认为你是对的,redis 连接/断开应该发生在套接字连接/断开中 如果你想将一个特定的socket ID加入到一个特定的房间ID,你需要发布socketID并且你需要将socketIDs放在某种数据库上,我通常将它们存储在redis上。如果是这种情况,您不需要在每次套接字连接时创建 Redis 订阅。由于 io.of('/').adapter.remoteJoin(socketId, room);可以在全局范围内完成,当一个套接字连接时,你需要做的就是将套接字 ID 存储在 db 和断开连接时,从 redis 中删除该套接字 ID,就是这样!

以上是关于在 redis 订阅事件发生后,如何加入套接字 io 房间?的主要内容,如果未能解决你的问题,请参考以下文章

redis空间键详解

redis事件

如何使用 SocketIO 订阅套接字通道? [关闭]

Redis 设计与实现

Redis事件

redis线程模型