SpringBoot 整合 WebSocket 服务代码教程
Posted 洛阳泰山
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot 整合 WebSocket 服务代码教程相关的知识,希望对你有一定的参考价值。
pom文件引入依赖
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-boot-starter-tomcat</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
添加WebSocketConfig配置类
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springblade.constant.WebSocketConstant;
import org.springblade.core.secure.BladeUser;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
import java.util.Objects;
@Slf4j
@Configuration
@EnableWebSocketMessageBroker
@ConditionalOnProperty(name = "websocket.enabled", havingValue = "true")
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer
/**
* 方法描述: 端点配置
*
* @param registry
* @Return
* @author caichengzhe
* @date 2021年07月08日 10:43:55
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry)
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
/**
* 方法描述: 消息传输格式配置
*
* @param registry
* @Return
* @author caichengzhe
* @date 2021年07月08日 10:43:38
*/
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry)
registry.setMessageSizeLimit(500 * 1024 * 1024);//500Mb
registry.setSendBufferSizeLimit(1024 * 1024 * 1024);//1Gb
registry.setSendTimeLimit(200000);//200s
/**
* 方法描述: 入界通道拦截,此处用来进行身份验证与用户信息设置
*
* @param registration
* @Return
* @author caichengzhe
* @date 2021年07月08日 10:43:15
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration)
registration.interceptors(new ChannelInterceptor()
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel)
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand()))
String token = accessor.getFirstNativeHeader(WebSocketConstant.WEBSOCKET_HEADER_TOKEN);
String userId = accessor.getFirstNativeHeader(WebSocketConstant.WEBSOCKET_HEADER_USER_ID);
if (StringUtil.isAnyBlank(token, userId))
return null;
//进行双重校验,两参数非空,且token中解析的用户id应于header传递的一致,否则视为非法请求
//websocket此处不再进行登录功能,均采用jwttoken进行身份验证
BladeUser bu = AuthUtil.getUser(token);
if (Objects.isNull(bu) || !StringUtil.equals(String.valueOf(bu.getUserId()), userId))
return null;
log.info("用户:" + bu.getRealName() + "; userId:" + bu.getUserId() + " 连接成功");
accessor.setUser(() -> userId);
return message;
);
/**
* 方法描述: 出界通道拦截配置,此处用来进行日志信息收集
*
* @param registration
* @Return
* @author caichengzhe
* @date 2021年07月08日 10:44:06
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration)
registration.interceptors(new ChannelInterceptor()
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent)
log.info("发出消息:" + JSON.toJSONString(message.getPayload()) + ";发送结果:" + (sent ? "成功" : "失败") + ";消息通道:" + channel);
);
@Override
public void configureMessageBroker(MessageBrokerRegistry registry)
//total message mapping include broadcast and specific topic
registry.enableSimpleBroker("/topic/broadcast", "/queue");
// client-to-server message mapping prefix
registry.setApplicationDestinationPrefixes("/unify-ws");
// server to specific client mapping prefix
registry.setUserDestinationPrefix("/user");
或者 简单配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig
/**
* 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter()
return new ServerEndpointExporter();
WebSocketService 服务类
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@ServerEndpoint(value = "/point/change")
@Component
@Api(value = "点位实时数据变更", tags = "点位实时数据变更")
public class WebSocketService
/** concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。*/
private static ConcurrentHashMap<String, WebSocketService> webSocketMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, String> messageMap = new ConcurrentHashMap<>();
/**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
private Session session;
/** 记录当前在线连接数 */
private static AtomicInteger onlineCount = new AtomicInteger(0);
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session)
this.session = session;
String sessionId = session.getId();
if(webSocketMap.containsKey(sessionId))
//加入set中
webSocketMap.remove(sessionId);
webSocketMap.put(sessionId,this);
else
//加入set中
webSocketMap.put(sessionId,this);
// 在线数加1
onlineCount.incrementAndGet();
log.info("有新连接加入:,当前在线人数为:", session.getId(), onlineCount.get());
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose()
if(webSocketMap.containsKey(session.getId()))
webSocketMap.remove(session.getId());
//从set中删除
onlineCount.decrementAndGet(); // 在线数减1
log.info("有一连接关闭:,当前在线人数为:", session.getId(), onlineCount.get());
/**
* 收到客户端消息后调用的方法
*
* @param message
* 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session)
messageMap.remove(session.getId());
messageMap.put(session.getId(),message);
log.info("服务端收到客户端[]的消息:", session.getId(), message);
// this.sendMessage("Hello, " + message, session);
@OnError
public void onError(Session session, Throwable error)
log.error("发生错误");
error.printStackTrace();
/**
* 服务端发送消息给客户端
*/
public void sendMessage(String message, Session toSession)
try
log.info("服务端给客户端[]发送消息", toSession.getId(), message);
toSession.getBasicRemote().sendText(message);
catch (Exception e)
log.error("服务端发送消息给客户端失败:", e);
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException
synchronized (session)
this.session.getBasicRemote().sendText(message);
public static ConcurrentHashMap<String, WebSocketService> getWebSocketMap()
return webSocketMap;
public static ConcurrentHashMap<String,String> getMessageMap()
return messageMap;
发送使用代码示例
@Resource
private WebSocketService webSocketService;
if(Func.isNotEmpty(jobMessages))
webSocketService.sendMessage(JSONObject.toJSONString(jobMessages));
以上是关于SpringBoot 整合 WebSocket 服务代码教程的主要内容,如果未能解决你的问题,请参考以下文章