SpringBoot 整合 WebSocket 服务代码教程

Posted 洛阳泰山

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot 整合 WebSocket 服务代码教程相关的知识,希望对你有一定的参考价值。

pom文件引入依赖

    <!--websocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                    <groupId>org.springframework.boot</groupId>
                </exclusion>
            </exclusions>
        </dependency>

添加WebSocketConfig配置类


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springblade.constant.WebSocketConstant;
import org.springblade.core.secure.BladeUser;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;

import java.util.Objects;


@Slf4j
@Configuration
@EnableWebSocketMessageBroker
@ConditionalOnProperty(name = "websocket.enabled", havingValue = "true")
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer 


    /**
     * 方法描述: 端点配置
     *
     * @param registry
     * @Return
     * @author caichengzhe
     * @date 2021年07月08日 10:43:55
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) 
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
    


    /**
     * 方法描述: 消息传输格式配置
     *
     * @param registry
     * @Return
     * @author caichengzhe
     * @date 2021年07月08日 10:43:38
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) 
        registry.setMessageSizeLimit(500 * 1024 * 1024);//500Mb
        registry.setSendBufferSizeLimit(1024 * 1024 * 1024);//1Gb
        registry.setSendTimeLimit(200000);//200s
    


    /**
     * 方法描述: 入界通道拦截,此处用来进行身份验证与用户信息设置
     *
     * @param registration
     * @Return
     * @author caichengzhe
     * @date 2021年07月08日 10:43:15
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) 
        registration.interceptors(new ChannelInterceptor() 
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) 
                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                if (StompCommand.CONNECT.equals(accessor.getCommand())) 
                    String token = accessor.getFirstNativeHeader(WebSocketConstant.WEBSOCKET_HEADER_TOKEN);
                    String userId = accessor.getFirstNativeHeader(WebSocketConstant.WEBSOCKET_HEADER_USER_ID);
                    if (StringUtil.isAnyBlank(token, userId)) 
                        return null;
                    
                    //进行双重校验,两参数非空,且token中解析的用户id应于header传递的一致,否则视为非法请求
                    //websocket此处不再进行登录功能,均采用jwttoken进行身份验证
                    BladeUser bu = AuthUtil.getUser(token);
                    if (Objects.isNull(bu) || !StringUtil.equals(String.valueOf(bu.getUserId()), userId)) 
                        return null;
                    
                    log.info("用户:" + bu.getRealName() + "; userId:" + bu.getUserId() + " 连接成功");
                    accessor.setUser(() -> userId);
                
                return message;
            
        );
    


    /**
     * 方法描述: 出界通道拦截配置,此处用来进行日志信息收集
     *
     * @param registration
     * @Return
     * @author caichengzhe
     * @date 2021年07月08日 10:44:06
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) 
        registration.interceptors(new ChannelInterceptor() 
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) 
                log.info("发出消息:" + JSON.toJSONString(message.getPayload()) + ";发送结果:" + (sent ? "成功" : "失败") + ";消息通道:" + channel);
            
        );
    


    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) 
        //total message mapping include broadcast and specific topic
        registry.enableSimpleBroker("/topic/broadcast", "/queue");
        // client-to-server message mapping prefix
        registry.setApplicationDestinationPrefixes("/unify-ws");
        // server to specific client mapping prefix
        registry.setUserDestinationPrefix("/user");
    

或者 简单配置


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;


@Configuration
public class WebSocketConfig 
    /**
     * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() 
        return new ServerEndpointExporter();
    

WebSocketService 服务类

import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;


@Slf4j
@ServerEndpoint(value = "/point/change")
@Component
@Api(value = "点位实时数据变更", tags = "点位实时数据变更")
public class WebSocketService 

    /** concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。*/
    private static ConcurrentHashMap<String, WebSocketService> webSocketMap = new ConcurrentHashMap<>();

    private static ConcurrentHashMap<String, String> messageMap = new ConcurrentHashMap<>();

    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;

    /** 记录当前在线连接数 */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) 
        this.session = session;
        String sessionId = session.getId();
        if(webSocketMap.containsKey(sessionId))
            //加入set中
            webSocketMap.remove(sessionId);
            webSocketMap.put(sessionId,this);
        else
            //加入set中
            webSocketMap.put(sessionId,this);
            // 在线数加1
            onlineCount.incrementAndGet();
        
        log.info("有新连接加入:,当前在线人数为:", session.getId(), onlineCount.get());
    

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() 
        if(webSocketMap.containsKey(session.getId()))
            webSocketMap.remove(session.getId());
            //从set中删除
            onlineCount.decrementAndGet(); // 在线数减1
        

        log.info("有一连接关闭:,当前在线人数为:", session.getId(), onlineCount.get());
    

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     *            客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) 
        messageMap.remove(session.getId());
        messageMap.put(session.getId(),message);
        log.info("服务端收到客户端[]的消息:", session.getId(), message);
//        this.sendMessage("Hello, " + message, session);
    

    @OnError
    public void onError(Session session, Throwable error) 
        log.error("发生错误");
        error.printStackTrace();
    

    /**
     * 服务端发送消息给客户端
     */
    public void sendMessage(String message, Session toSession) 
        try 
            log.info("服务端给客户端[]发送消息", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
         catch (Exception e) 
            log.error("服务端发送消息给客户端失败:", e);
        
    

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException 
        synchronized (session) 
            this.session.getBasicRemote().sendText(message);
        
    

    public static ConcurrentHashMap<String, WebSocketService> getWebSocketMap() 
        return webSocketMap;
    

    public static ConcurrentHashMap<String,String> getMessageMap()
        return messageMap;
    


发送使用代码示例

    @Resource
    private WebSocketService webSocketService;

   if(Func.isNotEmpty(jobMessages))
                                    
          webSocketService.sendMessage(JSONObject.toJSONString(jobMessages));

                

以上是关于SpringBoot 整合 WebSocket 服务代码教程的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot 整合websocket

springboot整合websocket

SpringBoot整合Websocket实现即时聊天功能

Springboot整合Websocket遇到的坑

Websocket教程SpringBoot+Maven整合(目录)

springboot整合webSocket(看完即入门)