SpringBoot2.x整合WebSoket
Posted dingwen_blog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot2.x整合WebSoket相关的知识,希望对你有一定的参考价值。
文章目录
## 一、
WebSocket
1.WebSocket
简介
WebSocket是一种通信协议,可在单个TCP连接上进行全双工通信。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以建立持久性的连接,并进行双向数据传输。
WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
2.WebSocket
特点
- 建立在 TCP 协议之上,服务器端的实现比较容易
- 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器
- 数据格式比较轻量,性能开销小,通信高效
- 可以发送文本,也可以发送二进制数据
- 没有同源限制,客户端可以与任意服务器通信
- 协议标识符是
ws
(如果加密,则为wss
),服务器网址就是 URL
3.WebSocket
属性
以下是 WebSocket 对象的属性。假定我们使用了以上代码创建了 Socket 对象:
属性 | 描述 |
---|---|
Socket.readyState | 只读属性 readyState 表示连接状态,可以是以下值:0 - 表示连接尚未建立。1 - 表示连接已建立,可以进行通信。2 - 表示连接正在进行关闭。3 - 表示连接已经关闭或者连接不能打开。 |
Socket.bufferedAmount | 只读属性 bufferedAmount 已被 send() 放入正在队列中等待传输,但是还没有发出的 UTF-8 文本字节数。 |
4.WebSocket
事件
以下是 WebSocket 对象的相关事件。假定我们使用了以上代码创建了 Socket 对象:
事件 | 事件处理程序 | 描述 |
---|---|---|
open | Socket.onopen | 连接建立时触发 |
message | Socket.onmessage | 客户端接收服务端数据时触发 |
error | Socket.onerror | 通信发生错误时触发 |
close | Socket.onclose | 连接关闭时触发 |
5.WebSocket
方法
以下是 WebSocket 对象的相关方法。假定我们使用了以上代码创建了 Socket 对象:
方法 | 描述 |
---|---|
Socket.send() | 使用连接发送数据 |
Socket.close() | 关闭连接 |
二、SpringBoot2.x整合WebSoket
1. 实现目标
- 私发消息
- 群发消息
- 支持接收离线消息(内存存储)
- 统计未读消息数量
2. 步骤
2.1 新建SpringBoot工程,添加Maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.dw</groupId>
<artifactId>springboot-websocket</artifactId>
<version>1.0</version>
<name>springboot-websocket</name>
<description>springboot-websocket</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.2项目结构如下
2.3服务端配置
package com.dw.sprboosoc.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* WebSocket 配置类
* dingwen
* 2021/2/20 10:37
**/
@Configuration
public class WebSocketConfig {
/*
* ServerEndpointExporter 会自动注册使用@ServerEndpoint注解声明的websocket endpoint
* @param []
* @return org.springframework.web.socket.server.standard.ServerEndpointExporter
*/
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
WebSocketServer
package com.dw.sprboosoc.service;
import com.alibaba.fastjson.JSON;
import com.dw.sprboosoc.constant.MessageEnum;
import com.dw.sprboosoc.dto.WebSocketMessageDto;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* WebSocket 核心类
* dingwen
* 2021/2/20 10:41
**/
// 添加Bean
@Component
@Slf4j
//访问路径
@ServerEndpoint(value = "/websocket/{sendUserId}")
public class WebSocketServer {
//当前在线连接数,保证线程安全
private static final AtomicInteger currentOnlineNumber = new AtomicInteger();
//concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象
private static final ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
//设置为静态的 公用一个消息map ConcurrentMap为线程安全的map HashMap不安全
private static final ConcurrentMap<String, Map<String, List<WebSocketMessageDto>>> messageMap = new ConcurrentHashMap<>();
/*
*发送消息
* @param [session, message, userId]
* @return void
*/
public void sendMessage(WebSocketMessageDto webSocketMessageDto) throws IOException {
try {
switch (webSocketMessageDto.getMessageEnum()) {
// 广播消息
case all:
sessionPool.values().forEach(se -> {
try {
se.getBasicRemote()
.sendText(webSocketMessageDto.toString());
} catch (IOException e) {
e.printStackTrace();
}
});
log.info("websocket: 广播消息" + webSocketMessageDto);
// 离线
storeOfflineMessage(webSocketMessageDto);
break;
// 私发
case one:
if (judgeUserOnline(webSocketMessageDto.getRecvUserId())) {
// 在线
sessionPool.get(webSocketMessageDto.getRecvUserId())
.getBasicRemote()
.sendText(webSocketMessageDto.toString());
} else {
// 离线
storeOfflineMessage(webSocketMessageDto);
}
log.info("websocket: 私发消息" + webSocketMessageDto);
break;
}
} catch (Exception exception) {
log.error("websocket: 发送消息发生了错误");
}
}
/*
*客户端收到消息
* @param [message]
* @return void
*/
@OnMessage
public void onMessage(String webSocketMessageDtoStr) throws IOException {
WebSocketMessageDto webSocketMessageDto = JSON.parseObject(webSocketMessageDtoStr, WebSocketMessageDto.class);
log.info("websocket:" + webSocketMessageDto.getRecvUserId() + "收到,来自" + webSocketMessageDto.getSendUserId() + "发送的消息" + webSocketMessageDto.getMessage());
sendMessage(webSocketMessageDto);
}
/*
*判断用户是否在线
* @param [recvUserId]
* @return boolean
*/
public boolean judgeUserOnline(String recvUserId) {
boolean flag = !ObjectUtils.isEmpty(sessionPool.get(recvUserId));
String flagStr = flag ? "在线" : "离线";
log.info("websocket: " + recvUserId + ":" + flagStr);
return flag;
}
/*
*用户离线时把消息存储到内存
* @param [recvUserId]
* @return void
*/
public void storeOfflineMessage(WebSocketMessageDto webSocketMessageDto) {
//用户不在线时 第一次给他发消息
if (ObjectUtils.isEmpty(messageMap.get(webSocketMessageDto.getRecvUserId()))) {
Map<String, List<WebSocketMessageDto>> maps = new HashMap<>();
List<WebSocketMessageDto> list = new ArrayList<>();
list.add(webSocketMessageDto);
maps.put(webSocketMessageDto.getRecvUserId(), list);
messageMap.put(webSocketMessageDto.getRecvUserId(), maps);
} else {
//用户不在线时 再次发送消息
Map<String, List<WebSocketMessageDto>> listObject = messageMap.get(webSocketMessageDto.getRecvUserId());
List<WebSocketMessageDto> objects = new ArrayList<>();
if (!ObjectUtils.isEmpty(listObject.get(webSocketMessageDto.getRecvUserId()))) {//这个用户给收消息的这个用户发过消息
//此用户给该用户发送过离线消息(此用户给该用户发过的所有消息)
objects = listObject.get(webSocketMessageDto.getRecvUserId());
//加上这次发送的消息
objects.add(webSocketMessageDto);
//替换原来的map
listObject.put(webSocketMessageDto.getRecvUserId(), objects);
} else {//这个用户没给该用户发送过离线消息
objects.add(webSocketMessageDto);
listObject.put(webSocketMessageDto.getRecvUserId(), objects);
}
messageMap.put(webSocketMessageDto.getRecvUserId(), listObject);
}
}
/*
*成功建立连接后调用
* @param [session, userId]
* @return void
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "sendUserId") String sendUserId) throws IOException {
//成功建立连接后加入
sessionPool.put(sendUserId, session);
//当前在线数量+1
currentOnlineNumber.incrementAndGet();
log.info("websocket:" + sendUserId + "加入连接,当前在线用户" + currentOnlineNumber + "未读消息数:" + getMessageCount(sendUserId));
// 发送离线消息
sendOffLineMessage(sendUserId);
}
/*
* 用户上线时发送离线消息
* @param []
* @return void
*/
@SneakyThrows
public void sendOffLineMessage(String sendUserId) {
if (ObjectUtils.isEmpty(messageMap.get(sendUserId))) {
// 该用户没有离线消息
return;
}
// 当前登录用户有离线消息
//说明在用户没有登录的时候有人给用户发送消息
//该用户所有未收的消息
Map<String, List<WebSocketMessageDto>> lists = messageMap.get(sendUserId);
//对象用户发送的离线消息
List<WebSocketMessageDto> list = lists.get(sendUserId);
if (list != null) {
for (WebSocketMessageDto webSocketMessageDto : list) {
onMessage(JSON.toJSONString(webSocketMessageDto));
}
}
// 删除已发送的消息
removeHasBeenSentMessage(sendUserId, lists);
}
/*
*删除已发送的消息
* @param [sendUserId, map]
* @return void
*/
public void removeHasBeenSentMessage(String sendUserId, Map<String, List<WebSocketMessageDto>> map) {
// map中key(键)的迭代器对象
//用户接收完消息后删除 避免下次继续发送
Iterator iterator = map.keySet().iterator();
while (iterator.hasNext()) {// 循环取键值进行判断
String keys = (String) iterator.next();//键
if (sendUserId.equals(keys)) {
iterator.remove();
}
}
}
/*
*关闭连接时调用
* @param [userId]
* @return void
*/
@OnClose
public void onClose(@PathParam(value = "sendUserId") String sendUserId) {
sessionPool.remove(sendUserId);
currentOnlineNumber.decrementAndGet();
log.info("websocket:" + sendUserId + "断开连接,当前在线用户" + currentOnlineNumber);
}
/*
*发生错误时调用
* @param [session, throwable]
* @return void
*/
@OnError
public void onError(Throwable throwable) {
log.error("websocket: 发生了错误");
throwable.printStackTrace();
}
/**
* 获取该用户未读的消息数量
*/
public int getMessageCount(String recvUserId) {
//获取该用户所有未收的消息
Map<String, List<WebSocketMessageDto>> listMap = messageMap.get(recvUserId);
if (listMap != null) {
List<WebSocketMessageDto> list = listMap.get(recvUserId);
if (list != null) {
return listMap.get(recvUserId).size();
} else {
以上是关于SpringBoot2.x整合WebSoket的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot2.x整合quartz实现多任务定时执行