在 redis 订阅事件发生后,如何加入套接字 io 房间?
Posted
技术标签:
【中文标题】在 redis 订阅事件发生后,如何加入套接字 io 房间?【英文标题】:How do I join a socket io room AFTER a redis subscription event happens? 【发布时间】:2021-12-09 22:29:06 【问题描述】:我正在使用 redis pub/sub,我想在 redis sub 事件之后加入一个房间:
const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require("redis");
const expressServer = app.listen(3001, () => console.log("Express is running!"));
const io = socketio(expressServer);
const sub = redis.createClient(port: 6379, host: '127.0.0.1');
sub.subscribe('message');
let room;
sub.on('error', function (error)
console.log('ERROR ' + error);
);
sub.on('connect', function()
console.log('Redis client connected');
);
//Redis sub scribe to message conversation channel
sub.on('message', async function (channel, data)
data = JSON.parse(data);
if(channel === 'conversation')
room = data.id;
console.log('room set ' + room)
);
io.on('connect', (socket) =>
socket.emit('test', 'from server')
//Room I'd like to join
socket.join(room);
io.of("/").adapter.on("join-room", (room, id) =>
console.log(`socket $id has joined room $room`);
);
);
基本上我想要在sub.on('message')
上设置room
状态,然后在我想要socket.join(room);
之后。现在发生的事情是套接字服务器的连接速度比 redis 子事件快。在第一个子事件中,socket 服务器加入了一个未定义的房间,然后它加入了sub.on(message)
设置的最后一个房间
有没有办法可以使用节点事件发射器,以便在 redis 设置房间后加入套接字房间?或者我将如何使用 socket.io.reids.adapter 来做到这一点?
It looks like there's a way to do this with new redis socket.io adapter 函数,但没有关于如何做的文档?函数:RedisAdapter.remoteJoin(id, room)
编辑
我开始尝试使用带有套接字适配器的remoteJoin
执行此操作。问题是socket.id
const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require("redis");
const redisAdapter = require('@socket.io/redis-adapter');
const expressServer = app.listen(3001, () => console.log("Express is running!"));
const io = socketio(expressServer);
const sub = redis.createClient(port: 6379, host: '127.0.0.1');
const pub = sub.duplicate();
io.adapter(redisAdapter(pub, sub));
sub.subscribe('conversation','message', function()
// console.log('Subbed')
)
//sub.subscribe('conversation','message');
let room;
let soketId;
sub.on('error', function (error)
console.log('ERROR ' + error);
);
sub.on('connect', function()
console.log('Redis client connected');
);
sub.on('message', async function (channel, data)
data = JSON.parse(data);
if(channel === 'conversation')
room = data.id;
io.of('/').adapter.remoteJoin(socketId, room);
console.log('subbed ' + room)
);
io.on('connect', (socket) =>
socketId = socket.id;
io.of("/").adapter.on("join-room", (room, id) =>
console.log(`socket $id has joined room $room`);
);
);
问题是socketId = socket.id;
设置为最后一个连接的客户端。所以所有客户端都成为最后一个连接的客户端。
【问题讨论】:
【参考方案1】:我已重构您的代码以响应来自 Redis pub/sub 的传入消息
可以使用新的 Redis 客户端或具有基于 Promise 的 API 的 ioredis 客户端来减少此代码。
每当有新客户端连接时,您都需要在 Redis 上订阅消息,以便在客户端连接时对新消息做出反应。
问题是当套接字断开时,您不再需要接收这些消息,所以我创建了一个辅助函数,它执行以下操作:
连接到redis 创建订阅 当有新消息到达时,它会调用回调函数 在调用时返回一个函数取消订阅消息并断开当前 Redis 客户端。
const express = require('express');
const app = express();
const socketio = require('socket.io');
const redis = require('redis');
const expressServer = app.listen(3001, () => console.log('Express is running!'));
const io = socketio(expressServer);
const createSubscriber = async cb =>
const client = redis.createClient( port: 6379, host: '127.0.0.1' );
await new Promise((resolve, reject) =>
client.on('connect', resolve);
client.on('error', reason =>
client.end();
reject(reason);
);
);
await new Promise((resolve, reject) =>
client.subscribe('message', err =>
if (err)
client.end();
reject(err);
else
resolve();
);
);
client.on('message', (channel, message) =>
if (channel === 'conversation')
cb(JSON.parse(message));
);
return async () =>
await new Promise(resolve =>
client.unsubscribe('message', err =>
if (err)
console.error(err);
client.end();
resolve();
);
);
;
;
io.on('connect', async socket =>
socket.emit('test', 'from server');
try
const onDisconnected = await createSubscriber(data =>
socket.join(data.id);
);
socket.on('disconnect', async () =>
await onDisconnected();
);
catch (error)
console.error(error); // redis has failed to connect
socket.emit('error', error.message);
socket.disconnect(); // disconnect the socket since there is nothing to do
);
io.of('/').adapter.on('join-room', (room, id) =>
console.log(`socket $id has joined room $room`);
);
【讨论】:
感谢您的帮助,但现在的问题是所有客户都加入了同一个房间。我之前尝试将我的子方法放入连接中,结果相同。这就是为什么我选择使用:io.of('/').adapter.remoteJoin(socketId, room);
在我的编辑中,但我认为你是对的,redis 连接/断开应该发生在套接字连接/断开中
如果你想将一个特定的socket ID加入到一个特定的房间ID,你需要发布socketID并且你需要将socketIDs放在某种数据库上,我通常将它们存储在redis上。如果是这种情况,您不需要在每次套接字连接时创建 Redis 订阅。由于 io.of('/').adapter.remoteJoin(socketId, room);可以在全局范围内完成,当一个套接字连接时,你需要做的就是将套接字 ID 存储在 db 和断开连接时,从 redis 中删除该套接字 ID,就是这样!以上是关于在 redis 订阅事件发生后,如何加入套接字 io 房间?的主要内容,如果未能解决你的问题,请参考以下文章