SpringBoot使用WebSocket实现服务端推送--集群实现
Posted 恒奇恒毅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot使用WebSocket实现服务端推送--集群实现相关的知识,希望对你有一定的参考价值。
书接上文,本文介绍了一种实现集群管理和消息传送方式。
在集群模式情况下,一般是nginx反向代理到多台Tomcat或者SLB代理到多台Tomcat的方式,怎么实现给某个人推送消息?比如WebSocket1连接到Tomcat1,但是在Tomcat2需要给WebSocket1发送消息,怎么办?一般的想法是像httpsession的集群处理方式一样,利用一个中间件Redis来保存session即可。但是实际测试才发现,根本不可取,因为WebSocket的session是有状态的,并且无法序列化,在往redis中保存的时候就抛异常了。通过查询资料,发现可以通过Redis的发布订阅模式来实现。其基本原理是:初始化的时候都订阅某个频道,Tomcat只管理连接到我的WebSocket的session,需要给某人发布消息的时候通过Redis发布一个消息,所有订阅了该频道的Tomcat都能接收到该消息,根据此消息找WebSocket,能找到就发送消息,不能找到忽略即可。
具体实现:
1.首先实现WebSocketManager,因为连接到本机的WebSocket还是本机管理,所以继承于MemWebSocketManager实现,只需要实现发送消息相关方法。
/**
* WebSocket的session无法序列化,所以session还是保存在本地内存中,发送消息这种就走订阅发布模式
* 1.redis或者mq进行发布订阅,广播->有某个节点能找到此人就发送消息,其他的忽略
* 2.Nginx进行IP hash 可以使用@link MemWebSocketManager
* @author xiongshiyan at 2018/10/10 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class RedisWebSocketManager extends MemWebSocketManager
public static final String CHANNEL = "websocket";
private static final String COUNT_KEY = "CountKey";
private StringRedisTemplate stringRedisTemplate;
public RedisWebSocketManager(StringRedisTemplate stringRedisTemplate)
this.stringRedisTemplate = stringRedisTemplate;
@Override
public void put(String identifier, WebSocket webSocket)
super.put(identifier, webSocket);
//在线数量加1
countChange(1);
@Override
public void remove(String identifier)
super.remove(identifier);
//在线数量减1
countChange(-1);
@Override
public int size()
return getCount();
@Override
public void sendMessage(String identifier, String message)
WebSocket webSocket = get(identifier);
//本地能找到就直接发
if(null != webSocket && WebSocket.STATUS_AVAILABLE == webSocket.getStatus())
WebSocketUtil.sendMessage(webSocket.getSession() , message);
return;
Map<String , Object> map = new HashMap<>(3);
map.put(RedisReceiver.ACTION , ActionFactory.ACTION_SEND_MESSAGE);
map.put(RedisReceiver.IDENTIFIER , identifier);
map.put("message" , message);
//在websocket频道上发布发送消息的消息
stringRedisTemplate.convertAndSend(CHANNEL , JsonUtil.serializeMap(map));
@Override
public void broadcast(String message)
Map<String , Object> map = new HashMap<>(2);
map.put(RedisReceiver.ACTION , ActionFactory.ACTION_BROADCAST_MESSAGE);
map.put("message" , message);
//在websocket频道上发布广播的消息
stringRedisTemplate.convertAndSend(CHANNEL , JsonUtil.serializeMap(map));
@Override
public void changeStatus(String identifier, int status)
WebSocket webSocket = get(identifier);
if(null != webSocket)
webSocket.setStatus(status);
return;
Map<String , Object> map = new HashMap<>(3);
map.put(RedisReceiver.ACTION , ActionFactory.ACTION_CHANGE_STATUS);
map.put(RedisReceiver.IDENTIFIER , identifier);
map.put("status" , status);
//在websocket频道上发布改变状态的消息
stringRedisTemplate.convertAndSend(CHANNEL , JsonUtil.serializeMap(map));
/**
* 增减在线数量
*/
private void countChange(int delta)
ValueOperations<String, String> value = stringRedisTemplate.opsForValue();
//获取在线当前数量
int count = getCount(value);
count = count + delta;
count = count > 0 ? count : 0;
//设置新的数量
value.set(COUNT_KEY , "" + count);
/**
* 获取当前在线数量
*/
private int getCount()
ValueOperations<String, String> value = stringRedisTemplate.opsForValue();
return getCount(value);
private int getCount(ValueOperations<String, String> value)
String countStr = value.get(COUNT_KEY);
int count = 0;
if(null != countStr)
count = Integer.parseInt(countStr);
return count;
该类中,重写了所有需要操作某个session的方法,在这些方法中指定不同的操作Action及带上相应的数据。
2.订阅者收到改消息及能做出不同的动作,订阅者持有WebSocketManager,可以操作相关的session。
/**
* redis消息订阅者
* @author xiongshiyan
*/
public class RedisReceiver
private static final Logger LOGGER = LoggerFactory.getLogger(RedisReceiver.class);
public static final String IDENTIFIER = "identifier";
public static final String ACTION = "action";
private CountDownLatch latch;
private WebSocketManager webSocketManager;
public RedisReceiver(WebSocketManager webSocketManager, CountDownLatch latch)
this.webSocketManager = webSocketManager;
this.latch = latch;
/**
* 此方法会被反射调用
*/
public void receiveMessage(String message)
LOGGER.info(message);
JSONObject object = new JSONObject(message);
if(!object.containsKey(ACTION))
return;
String actionString = object.getString(ACTION);
Action action = ActionFactory.create(actionString);
action.doMessage(this.webSocketManager , object);
//接收到消息要做的事情
latch.countDown();
消息订阅者接收到消息之后,根据Action得到操作类,实现不同的操作,如果以后有更多的功能,那么添加相应的Action类即可。
/**
* @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class ActionFactory
public static final String ACTION_SEND_MESSAGE = "sendMessage";
public static final String ACTION_CHANGE_STATUS = "changeStatus";
public static final String ACTION_BROADCAST_MESSAGE = "broadcast";
public static Action create(String action)
if(ACTION_SEND_MESSAGE.equalsIgnoreCase(action))
return new SendMessageAction();
else if(ACTION_CHANGE_STATUS.equalsIgnoreCase(action))
return new ChangeStatusAction();
else if(ACTION_BROADCAST_MESSAGE.equalsIgnoreCase(action))
return new BroadCastAction();
else
return new NoActionAction();
/**
*
* "action":"sendMessage",
* "identifier":"xxx",
* "message":"xxxxxxxxxxx"
*
* 给webSocket发送消息的action
* @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class SendMessageAction implements Action
private static final String MESSAGE = "message";
@Override
public void doMessage(WebSocketManager manager, JSONObject object)
if(!object.containsKey(RedisReceiver.IDENTIFIER))
return;
if(!object.containsKey(MESSAGE))
return;
String identifier = object.getString(RedisReceiver.IDENTIFIER);
WebSocket webSocket = manager.get(identifier);
if(null == webSocket || WebSocket.STATUS_AVAILABLE != webSocket.getStatus())
return;
WebSocketUtil.sendMessage(webSocket.getSession() , object.getString(MESSAGE));
/**
*
* "action":"changeStatus",
* "identifier":"xxx",
* "status":1
*
* 改变状态的action
* @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class ChangeStatusAction implements Action
private static final String STATUS = "status";
@Override
public void doMessage(WebSocketManager manager , JSONObject object)
if(!object.containsKey(RedisReceiver.IDENTIFIER))
return;
if(!object.containsKey(STATUS))
return;
WebSocket webSocket = manager.get(object.getString(RedisReceiver.IDENTIFIER));
if(null == webSocket)
return;
webSocket.setStatus(object.getInteger(STATUS));
/**
*
* "action":"broadcast",
* "message":"xxxxxxxxxxxxx"
*
* 广播给所有的websocket发送消息 action
* @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class BroadCastAction implements Action
@Override
public void doMessage(WebSocketManager manager, JSONObject object)
String message = object.getString("message");
//从本地取出所有的websocket发送消息
manager.localWebSocketMap().values().forEach(
webSocket -> WebSocketUtil.sendMessage(
webSocket.getSession() , message));
/**
* do nothing action
* @author xiongshiyan at 2018/10/12 , contact me with email yanshixiong@126.com or phone 15208384257
*/
public class NoActionAction implements Action
@Override
public void doMessage(WebSocketManager manager, JSONObject object)
// do no thing
3.配置WebSocketManager和消息订阅。
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory)
return new StringRedisTemplate(connectionFactory);
/**
* 使用redis管理,具备集群功能
*/
@Bean(WebSocketManager.WEBSOCKET_MANAGER_NAME)
public RedisWebSocketManager webSocketManager(@Autowired StringRedisTemplate stringRedisTemplate)
return new RedisWebSocketManager(stringRedisTemplate);
**
* @author xiongshiyan
* redis管理websocket配置
*/
@Configuration
@ConditionalOnBean(RedisWebSocketManager.class)
public class RedisWebSocketConfig
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter)
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic(RedisWebSocketManager.CHANNEL));
return container;
@Bean
public RedisReceiver receiver(
@Autowired@Qualifier("webSocketManager") WebSocketManager webSocketManager,
@Autowired@Qualifier("latch") CountDownLatch latch)
return new RedisReceiver(webSocketManager , latch);
@Bean
public MessageListenerAdapter listenerAdapter(RedisReceiver receiver)
return new MessageListenerAdapter(receiver, "receiveMessage");
@Bean
public CountDownLatch latch()
return new CountDownLatch(1);
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory)
return new StringRedisTemplate(connectionFactory);
至此,实现了WebSocket的集群管理。
项目参见 https://gitee.com/xxssyyyyssxx/websocket-springboot-starter 目前支持多ServerEndPoint和多WebSocketManager,一般情况下他们的关系是一对一的,便于管理。
使用demo见 https://gitee.com/xxssyyyyssxx/websocket-demo
以上是关于SpringBoot使用WebSocket实现服务端推送--集群实现的主要内容,如果未能解决你的问题,请参考以下文章