Web在线聊天室(12) --- 收发消息(单例模式+阻塞式队列)

Posted 满眼*星辰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Web在线聊天室(12) --- 收发消息(单例模式+阻塞式队列)相关的知识,希望对你有一定的参考价值。

websocket的理解

基于代码理解

在这里插入图片描述

客户端和服务端建立连接,就产生一个新的webSocket对象,所以不存在线程安全问题(Servlet是一个实例对象,多线程操作一个对象存在线程安全问题)

理论上的实现

异步,事件驱动,io模型的理论原理

在这里插入图片描述
从网卡拿到数据,然后系统决定数据怎么交给进程,都是io的操作,由io的模型来决定如何交付数据

websocket的实现,是基于事件驱动,异步的方式(原理叫作io多路复用)

注册一个事件后,java进程(web项目不用一直阻塞等待io数据返回)【程序该做啥就做啥,系统接收到网卡传到对应端口进程的数据,websocket某个事件发生以后,由系统调用websocket的回调函数】

我(java进程运行web项目)挂了 n 个鱼竿,都绑定铃铛(注册事件),如果有鱼上钩(事件发生),铃铛会通知我,我就再把对应鱼竿的鱼处理好(执行回调函数)

websocket协议的原理

建立连接基于http协议,(包含升级协议字段)

传输数据:基于传输层tcp,应用层websocket协议,长连接,双方都保持连接状态,双方都可以收发消息

服务器接收消息的处理方式

存在问题

服务器端onMessage方法

    @OnMessage
    public void onMessage(Session session, String message) {
        // 1.遍历保存的所有session,每个都发送消息
        MessageCenter.sendMessage(message);
        // 2.消息还要保存在数据库,
        // (1)反序列化json字符串为message对象
        Message msg = Util.deserialize(message, Message.class);
        // (2)message设置接收消息的时间
//        msg.setSendTime(new Date());
        // (3)插入数据库
        int n = MessageDao.insert(msg);

        System.out.println("接收到的消息:" + message);
    }

具体发送消息sendMessage方法

    public static void sendMessage(String message) {
        try {
            //遍历map的所有数据,然后转发到所有session对象中
            Enumeration<Session> e = clients.elements();
            while (e.hasMoreElements()) {
                Session session = e.nextElement();
                session.getBasicRemote().sendText(message);
            }
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }

遍历所有客户端,如果客户端数量很多,sendMessage方法长时间无法返回
|
@OnMessage注解方法无法返回(接收某个客户端消息处于长时间的处理执行上)
|
大量的接收客户端消息时,产生大量的消息堆积在服务端,服务端资源占用太多

解决方案

接收消息,发送消息的步骤,分离开(用不同的线程来处理,双方可以并发,并行的执行)

阻塞队列:用来存放消息,接收的客户端消息就放在里面(放进去时很快的)

再启动一个线程,不停的拉取队列中的消息,发送(发送和接收并发并行执行,分离)

改造方案:基于阻塞队列,将接收消息和发送消息两个操作分隔开,(异步消息处理)

目的:大量消息接收时,可以降低服务器压力,达到消峰的目的

处理消息的仓库messageCenter

把该类升级为单例模式(懒汉模式)

package org.example.model;

import javax.websocket.Session;
import java.io.IOException;
import java.util.Enumeration;
import java.util.concurrent.*;

/**
 * Created with IntelliJ IDEA.
 * Description:保存websocket需要的信息(所有客户端session)
 * User: starry
 * Date: 2021 -05 -27
 * Time: 9:48
 */
public class MessageCenter {

    /**
     * 支持线程安全的map结构,并且满足高并发(读写,读读并发,写写互斥,加锁粒度)
     */
    private static final ConcurrentHashMap<Integer, Session>  clients = new ConcurrentHashMap<>();

    /**
     * 阻塞队列(无边界的队列)
     */
    private static BlockingQueue<String> queue = new LinkedBlockingQueue<>();


    private static volatile MessageCenter center;

    private MessageCenter(){}

    public static MessageCenter getInstance() {
        //单例模式,双重效验锁的单例模式
        if (center == null) {
            synchronized (MessageCenter.class) {
                if (center == null) {
                    center = new MessageCenter();
                    new Thread(() -> {  //启动一个线程,不停地从阻塞队列中拿数据
                        while (true) {
                            try {
                                String message = queue.take();  //阻塞式获取数据,如果队列为空,阻塞等待
                                sendMessage(message);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }).start();
                }
            }
        }
        return center;
    }

    /**
     * 不直接发消息,先把消息存放再队列中,由另一个线程去发
     */
    public void addMessage(String message) {
        queue.add(message);
    }

    /**
     * websocket建立连接时,添加用户id和客户端session,保存起来
     */
    public void addOnlineUser(Integer userId, Session session) {
        clients.put(userId,session);
    }

    /**
     * 关闭websocket连接,和出错时,删除客户端session
     */
    public void delOnlineUser(Integer userId) {
        clients.remove(userId);
    }

    /**
     * 接收到某用户的消息时,转发到所有客户端
     * 存在一个消息转发所有客户端,存在新能问题
     * 如果接收到的信息数量m很多,同时在线的用户数量n也很多,
     * 那么要转发的次数就是m*n次,每个接收消息都是一个线程,
     * 都要等待websocket中的onmessage回调方法执行完,性能差
     * 优化(使用阻塞队列的方式解决:并行并发的执行任务提交和执行任务)
     */
    public static void sendMessage(String message) {
        try {
            //遍历map的所有数据,然后转发到所有session对象中
            Enumeration<Session> e = clients.elements();
            while (e.hasMoreElements()) {
                Session session = e.nextElement();
                session.getBasicRemote().sendText(message);
            }
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }

}

服务器webSocket

package org.example.servlet;

import org.example.dao.MessageDao;
import org.example.dao.UserDao;
import org.example.model.Message;
import org.example.model.MessageCenter;
import org.example.util.Util;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;

/**
 * Created with IntelliJ IDEA.
 * Description:服务器websocket
 * User: starry
 * Date: 2021 -05 -27
 * Time: 8:54
 */

@ServerEndpoint("/message/{userId}")
public class MessageWebsocket {

    @OnOpen
    public void onOpen(@PathParam("userId") Integer userId, Session session) throws IOException {
        // 1.把每个客户端的session都保存起来,之后转发消息到所有客户端要用
//        MessageCenter.addOnlineUser(userId,session);
        MessageCenter.getInstance().addOnlineUser(userId,session);
        // 2.查询本客户端(用户)上次登录前的消息(数据库查)
        List<Message> list = MessageDao.queryByLastLogout(userId);
        // 3.发送当前用户在上次登录后的消息
        for (Message m : list) {
            session.getBasicRemote().sendText(Util.serialize(m));
        }
        System.out.println("建立连接" + userId);
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        // 1.遍历保存的所有session,每个都发送消息
//        MessageCenter.sendMessage(message);
        MessageCenter.getInstance().addMessage(message);
        // 2.消息还要保存在数据库,
        // (1)反序列化json字符串为message对象
        Message msg = Util.deserialize(message, Message.class);
        // (2)message设置接收消息的时间
//        msg.setSendTime(new Date());
        // (3)插入数据库
        int n = MessageDao.insert(msg);

        System.out.println("接收到的消息:" + message);
    }

    @OnClose
    public void onClose(@PathParam("userId") Integer userId) {
        //1.本客户端关闭连接,要在之前保存的session集合中,删除
//        MessageCenter.delOnlineUser(userId);
        MessageCenter.getInstance().delOnlineUser(userId);
        //2.建立连接要获取用户上次登录以后的消息,所以关闭长连接就是代表用户退出
        //更新用户的上次登录时间
        int n = UserDao.updateLastLogout(userId);
        System.out.println("关闭连接");
    }

    @OnError
    public void onError(@PathParam("userId") Integer userId, Throwable throwable) {
        System.out.println("出错了");
        //和关闭连接的操作一样
//        MessageCenter.delOnlineUser(userId);
        MessageCenter.getInstance().delOnlineUser(userId);
        throwable.printStackTrace();
    }

}

实现效果

每个客户端可以同时收发消息,并且用户极多,同时发消息也可以正常执行(阻塞式队列)
在这里插入图片描述

在这里插入图片描述

以上是关于Web在线聊天室(12) --- 收发消息(单例模式+阻塞式队列)的主要内容,如果未能解决你的问题,请参考以下文章

Web在线聊天室 --- 服务器中换收发消息

Web在线聊天室 --- 服务器中换收发消息

在线聊天室 --- 需求分析及准备工作

如何基于 ZEGO SDK 实现 Web 基本消息收发

Web在线聊天室(10) --- 插入消息

Web在线聊天室(10) --- 插入消息