SpringBoot整合Websocket实现即时聊天功能

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Websocket实现即时聊天功能相关的知识,希望对你有一定的参考价值。

参考技术A 近期,公司需要新增即时聊天的业务,于是用websocket 整合到Springboot完成业务的实现。

一、我们来简单的介绍下websocket的交互原理:

1.客户端先服务端发起websocket请求;

2.服务端接收到请求之后,把请求响应返回给客户端;

3.客户端与服务端只需要一次握手即可完成交互通道;

  二、webscoket支持的协议:基于TCP协议下,http协议和https协议;

  http协议 springboot不需要做任何的配置 

  https协议则需要配置nignx代理,注意证书有效的问题  ---在这不做详细说明

  三、开始我们的实现java后端的实现

  1.添加依赖

  <!-- websocket -->

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-websocket</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework</groupId>

            <artifactId>spring-websocket</artifactId>

            <version>$spring.version</version>

        </dependency>

        <dependency>

            <groupId>org.springframework</groupId>

            <artifactId>spring-messaging</artifactId>

            <version>$spring.version</version>

        </dependency>

        <!-- WebSocket -->

  2.配置config

@ConditionalOnWebApplication

@Configuration

@EnableWebSocketMessageBroker

public class WebSocketConfig extends AbstractSessionWebSocketMessageBrokerConfigurer

    @Bean

    public ServerEndpointExporter serverEndpointExporter()

        return  new ServerEndpointExporter();

   

    @Bean

    public CustomSpringConfigurator customSpringConfigurator()

        return new CustomSpringConfigurator();

   

    @Override

    protected void configureStompEndpoints(StompEndpointRegistry registry)

        registry.addEndpoint("/websocket").setAllowedOrigins("*")

                .addInterceptors(new HttpSessionHandshakeInterceptor()).withSockJS();

   

public class CustomSpringConfigurator extends ServerEndpointConfig.Configurator implements ApplicationContextAware

    private static volatile BeanFactory context;

    @Override

    public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException

        return context.getBean(clazz);

   

    @Override

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException

        CustomSpringConfigurator.context = applicationContext;

   

    @Override

    public void modifyHandshake(ServerEndpointConfig sec,

                                HandshakeRequest request, HandshakeResponse response)

        super.modifyHandshake(sec,request,response);

        HttpSession httpSession=(HttpSession) request.getHttpSession();

        if(httpSession!=null)

            sec.getUserProperties().put(HttpSession.class.getName(),httpSession);

       

   



@SpringBootApplication

@EnableCaching

@ComponentScan("com")

@EnableWebSocket

public class Application extends SpringBootServletInitializer

static final Logger logger = LoggerFactory.getLogger(Application.class);

    @Override

    protected SpringApplicationBuilder configure(SpringApplicationBuilder application)

        return application.sources(Application.class);

   

需要注意的是: @EnableWebSocket  一定要加在启动类上,不然springboot无法对其扫描进行管理;

@SeverEndpoint --将目标类定义成一个websocket服务端,注解对应的值将用于监听用户连接的终端访问地址,客户端可以通过URL来连接到websocket服务端。

四、设计思路:用map<房间id, 用户set>来保存房间对应的用户连接列表,当有用户进入一个房间的时候,就会先检测房间是否存在,如果不存在那就新建一个空的用户set,再加入本身到这个set中,确保不同房间号里的用户session不串通!

/**

* Create by wushuyu

* on 2020/4/30 13:24

*

*/

@ServerEndpoint(value = "/websocket/roomName", configurator = CustomSpringConfigurator.class)

@Component

public class WebSocketRoom

    //连接超时--一天

    private static final long MAX_TIME_OUT = 24*60*60*1000;

    // key为房间号,value为该房间号的用户session

    private static final Map<String, Set<Session>> rooms = new ConcurrentHashMap<>();

    //将用户的信息存储在一个map集合里

    private static final Map<String, Object> users = new ConcurrentHashMap<>();

/**

*roomName 使用通用跳转,实现动态获取房间号和用户信息  格式:roomId|xx|xx

*/

    @OnOpen 

    public void connect(@PathParam("roomName") String roomName, Session session)

        String roomId = roomName.split("[|]")[0];

        String nickname = roomName.split("[|]")[1];

        String loginId = roomName.split("[|]")[2];

        //设置连接超时时间

            session.setMaxIdleTimeout(MAX_TIME_OUT);

        try

          //可实现业务逻辑

           

            // 将session按照房间名来存储,将各个房间的用户隔离

            if (!rooms.containsKey(roomId))

                // 创建房间不存在时,创建房间

                Set<Session> room = new HashSet<>();

                // 添加用户

                room.add(session);

                rooms.put(roomId, room);

            else // 房间已存在,直接添加用户到相应的房间             

                if (rooms.values().contains(session)) //如果房间里有此session直接不做操作

                else //不存在则添加

                    rooms.get(roomId).add(session);

               

           

            JSONObject jsonObject = new JSONObject();

            -----

            //根据自身业务情况实现业务

            -----

            users.put(session.getId(), jsonObject);

            //向在线的人发送当前在线的人的列表    -------------可有可无,看业务需求

            List<ChatMessage> userList = new LinkedList<>();

            rooms.get(roomId)

                    .stream()

                    .map(Session::getId)

                    .forEach(s ->

                        ChatMessage chatMessage = new ChatMessage();

                        chatMessage.setDate(new Date());

                        chatMessage.setStatus(1);

                        chatMessage.setChatContent(users.get(s));

                        chatMessage.setMessage("");

                        userList.add(chatMessage);

                    );

//        session.getBasicRemote().sendText(JSON.toJSONString(userList));

            //向房间的所有人群发谁上线了

            ChatMessage chatMessage = new ChatMessage();  ----将聊天信息封装起来。

            chatMessage.setDate(new Date());

            chatMessage.setStatus(1);

            chatMessage.setChatContent(users.get(session.getId()));

            chatMessage.setMessage("");

            broadcast(roomId, JSON.toJSONString(chatMessage));

            broadcast(roomId, JSON.toJSONString(userList));

        catch (Exception e)

            e.printStackTrace();

       

   

    @OnClose

    public void disConnect(@PathParam("roomName") String roomName, Session session)

        String roomId = roomName.split("[|]")[0];

        String loginId = roomName.split("[|]")[2];

        try

            rooms.get(roomId).remove(session);

            ChatMessage chatMessage = new ChatMessage();

            chatMessage.setDate(new Date());

            chatMessage.setUserName(user.getRealname());

            chatMessage.setStatus(0);

            chatMessage.setChatContent(users.get(session.getId()));

            chatMessage.setMessage("");

            users.remove(session.getId());

            //向在线的人发送当前在线的人的列表  ----可有可无,根据业务要求

            List<ChatMessage> userList = new LinkedList<>();

            rooms.get(roomId)

                    .stream()

                    .map(Session::getId)

                    .forEach(s ->

                        ChatMessage chatMessage1 = new ChatMessage();

                        chatMessage1.setDate(new Date());

                        chatMessage1.setUserName(user.getRealname());

                        chatMessage1.setStatus(1);

                        chatMessage1.setChatContent(users.get(s));

                        chatMessage1.setMessage("");

                        userList.add(chatMessage1);

                    );

            broadcast(roomId, JSON.toJSONString(chatMessage));

            broadcast(roomId, JSON.toJSONString(userList));

        catch (Exception e)

            e.printStackTrace();

       

   

    @OnMessage

    public void receiveMsg( String msg, Session session)

        try

                ChatMessage chatMessage = new ChatMessage();

                chatMessage.setUserName(user.getRealname());

                chatMessage.setStatus(2);

                chatMessage.setChatContent(users.get(session.getId()));

                chatMessage.setMessage(msg);

                // 按房间群发消息

                broadcast(roomId, JSON.toJSONString(chatMessage));

           

        catch (IOException e)

            e.printStackTrace();

       

   

    // 按照房间名进行群发消息

    private void broadcast(String roomId, String msg)

        rooms.get(roomId).forEach(s ->

            try

                s.getBasicRemote().sendText(msg);  -----此还有一个getAsyncRemote() 

            catch (IOException e)

                e.printStackTrace();

           

        );

   

    @OnError

    public void onError(Throwable error)

        error.printStackTrace();

   



友情提示:此session是websocket里的session,并非httpsession;

springboot整合websocket

一、背景

??我们都知道http协议只能浏览器单方面向服务器发起请求获得响应,服务器不能主动向浏览器推送消息。想要实现浏览器的主动推送有两种主流实现方式:

  • 轮询:缺点很多,但是实现简单
  • websocket:在浏览器和服务器之间建立tcp连接,实现全双工通信

??springboot使用websocket有两种方式,一种是实现简单的websocket,另外一种是实现STOMP协议。这一篇实现简单的websocket,STOMP下一篇在讲。

注意:如下都是针对使用springboot内置容器

二、实现

1、依赖引入

??要使用websocket关键是@ServerEndpoint这个注解,该注解是javaee标准中的注解,tomcat7及以上已经实现了,如果使用传统方法将war包部署到tomcat中,只需要引入如下javaee标准依赖即可:

<dependency>
  <groupId>javax</groupId>
  <artifactId>javaee-api</artifactId>
  <version>7.0</version>
  <scope>provided</scope>
</dependency>

如使用springboot内置容器,无需引入,springboot已经做了包含。我们只需引入如下依赖即可:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>1.5.3.RELEASE</version>
    <type>pom</type>
</dependency>

2、注入Bean

??首先注入一个ServerEndpointExporterBean,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint。代码如下:

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

3、申明endpoint

??建立MyWebSocket.java类,在该类中处理websocket逻辑

@ServerEndpoint(value = "/websocket") //接受websocket请求路径
@Component  //注册到spring容器中
public class MyWebSocket {


    //保存所有在线socket连接
    private static Map<String,MyWebSocket> webSocketMap = new LinkedHashMap<>();

    //记录当前在线数目
    private static int count=0;

    //当前连接(每个websocket连入都会创建一个MyWebSocket实例
    private Session session;

    private Logger log = LoggerFactory.getLogger(this.getClass());
    //处理连接建立
    @OnOpen
    public void onOpen(Session session){
        this.session=session;
        webSocketMap.put(session.getId(),this);
        addCount();
        log.info("新的连接加入:{}",session.getId());
    }

    //接受消息
    @OnMessage
    public void onMessage(String message,Session session){
        log.info("收到客户端{}消息:{}",session.getId(),message);
        try{
            this.sendMessage("收到消息:"+message);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    //处理错误
    @OnError
    public void onError(Throwable error,Session session){
        log.info("发生错误{},{}",session.getId(),error.getMessage());
    }

    //处理连接关闭
    @OnClose
    public void onClose(){
        webSocketMap.remove(this.session.getId());
        reduceCount();
        log.info("连接关闭:{}",this.session.getId());
    }

    //群发消息

    //发送消息
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    //广播消息
    public static void broadcast(){
        MyWebSocket.webSocketMap.forEach((k,v)->{
            try{
                v.sendMessage("这是一条测试广播");
            }catch (Exception e){
            }
        });
    }

    //获取在线连接数目
    public static int getCount(){
        return count;
    }

    //操作count,使用synchronized确保线程安全
    public static synchronized void addCount(){
        MyWebSocket.count++;
    }

    public static synchronized void reduceCount(){
        MyWebSocket.count--;
    }
}

4、客户的实现

??客户端使用h5原生websocket,部分浏览器可能不支持。代码如下:

<html>

<head>
    <title>websocket测试</title>
    <meta charset="utf-8">
</head>

<body>
    <button onclick="sendMessage()">测试</button>
    <script>
        let socket = new WebSocket("ws://localhost:8080/websocket");
        socket.onerror = err => {
            console.log(err);
        };
        socket.onopen = event => {
            console.log(event);
        };
        socket.onmessage = mess => {
            console.log(mess);
        };
        socket.onclose = () => {
            console.log("连接关闭");
        };

        function sendMessage() {
            if (socket.readyState === 1)
                socket.send("这是一个测试数据");
            else
                alert("尚未建立websocket连接");
        }
    </script>
</body>

</html>

三、测试

??建立一个controller测试群发,代码如下:

@RestController
public class HomeController {

    @GetMapping("/broadcast")
    public void broadcast(){
        MyWebSocket.broadcast();
    }
}

然后打开上面的html,可以看到浏览器和服务器都输出连接成功的信息:

浏览器:
Event {isTrusted: true, type: "open", target: WebSocket, currentTarget: WebSocket, eventPhase: 2, …}

服务端:
2018-08-01 14:05:34.727  INFO 12708 --- [nio-8080-exec-1] com.fxb.h5websocket.MyWebSocket          : 新的连接加入:0

点击测试按钮,可在服务端看到如下输出:

2018-08-01 15:00:34.644  INFO 12708 --- [nio-8080-exec-6] com.fxb.h5websocket.MyWebSocket          : 收到客户端2消息:这是一个测试数据

再次打开html页面,这样就有两个websocket客户端,然后在浏览器访问localhost:8080/broadcast测试群发功能,每个客户端都会输出如下信息:

MessageEvent {isTrusted: true, data: "这是一条测试广播", origin: "ws://localhost:8080", lastEventId: "", source: null, …}



??源码可在github上下载,记得点赞,star哦



以上是关于SpringBoot整合Websocket实现即时聊天功能的主要内容,如果未能解决你的问题,请参考以下文章

springboot整合websocket

开发经验springboot整合websocket实现群聊

springboot整合websocket实现登录挤退现象

springboot整合websocket实现一对一消息推送和广播消息推送

SpringBoot 整合WebSocket 实现简单聊天室

springboot整合websocket实现客户端与服务端通信