Web在线聊天室(12) --- 收发消息(单例模式+阻塞式队列)
Posted 满眼*星辰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Web在线聊天室(12) --- 收发消息(单例模式+阻塞式队列)相关的知识,希望对你有一定的参考价值。
目录
websocket的理解
基于代码理解
客户端和服务端建立连接,就产生一个新的webSocket对象,所以不存在线程安全问题(Servlet是一个实例对象,多线程操作一个对象存在线程安全问题)
理论上的实现
异步,事件驱动,io模型的理论原理
从网卡拿到数据,然后系统决定数据怎么交给进程,都是io的操作,由io的模型来决定如何交付数据
websocket的实现,是基于事件驱动,异步的方式(原理叫作io多路复用)
注册一个事件后,java进程(web项目不用一直阻塞等待io数据返回)【程序该做啥就做啥,系统接收到网卡传到对应端口进程的数据,websocket某个事件发生以后,由系统调用websocket的回调函数】
我(java进程运行web项目)挂了 n 个鱼竿,都绑定铃铛(注册事件),如果有鱼上钩(事件发生),铃铛会通知我,我就再把对应鱼竿的鱼处理好(执行回调函数)
websocket协议的原理
建立连接基于http协议,(包含升级协议字段)
传输数据:基于传输层tcp,应用层websocket协议,长连接,双方都保持连接状态,双方都可以收发消息
服务器接收消息的处理方式
存在问题
服务器端onMessage方法
@OnMessage
public void onMessage(Session session, String message)
// 1.遍历保存的所有session,每个都发送消息
MessageCenter.sendMessage(message);
// 2.消息还要保存在数据库,
// (1)反序列化json字符串为message对象
Message msg = Util.deserialize(message, Message.class);
// (2)message设置接收消息的时间
// msg.setSendTime(new Date());
// (3)插入数据库
int n = MessageDao.insert(msg);
System.out.println("接收到的消息:" + message);
具体发送消息sendMessage方法
public static void sendMessage(String message)
try
//遍历map的所有数据,然后转发到所有session对象中
Enumeration<Session> e = clients.elements();
while (e.hasMoreElements())
Session session = e.nextElement();
session.getBasicRemote().sendText(message);
catch (IOException ioException)
ioException.printStackTrace();
遍历所有客户端,如果客户端数量很多,sendMessage方法长时间无法返回
|
@OnMessage注解方法无法返回(接收某个客户端消息处于长时间的处理执行上)
|
大量的接收客户端消息时,产生大量的消息堆积在服务端,服务端资源占用太多
解决方案
接收消息,发送消息的步骤,分离开(用不同的线程来处理,双方可以并发,并行的执行)
阻塞队列:用来存放消息,接收的客户端消息就放在里面(放进去时很快的)
再启动一个线程,不停的拉取队列中的消息,发送(发送和接收并发并行执行,分离)
改造方案:基于阻塞队列,将接收消息和发送消息两个操作分隔开,(异步消息处理)
目的:大量消息接收时,可以降低服务器压力,达到消峰的目的
处理消息的仓库messageCenter
把该类升级为单例模式(懒汉模式)
package org.example.model;
import javax.websocket.Session;
import java.io.IOException;
import java.util.Enumeration;
import java.util.concurrent.*;
/**
* Created with IntelliJ IDEA.
* Description:保存websocket需要的信息(所有客户端session)
* User: starry
* Date: 2021 -05 -27
* Time: 9:48
*/
public class MessageCenter
/**
* 支持线程安全的map结构,并且满足高并发(读写,读读并发,写写互斥,加锁粒度)
*/
private static final ConcurrentHashMap<Integer, Session> clients = new ConcurrentHashMap<>();
/**
* 阻塞队列(无边界的队列)
*/
private static BlockingQueue<String> queue = new LinkedBlockingQueue<>();
private static volatile MessageCenter center;
private MessageCenter()
public static MessageCenter getInstance()
//单例模式,双重效验锁的单例模式
if (center == null)
synchronized (MessageCenter.class)
if (center == null)
center = new MessageCenter();
new Thread(() -> //启动一个线程,不停地从阻塞队列中拿数据
while (true)
try
String message = queue.take(); //阻塞式获取数据,如果队列为空,阻塞等待
sendMessage(message);
catch (InterruptedException e)
e.printStackTrace();
).start();
return center;
/**
* 不直接发消息,先把消息存放再队列中,由另一个线程去发
*/
public void addMessage(String message)
queue.add(message);
/**
* websocket建立连接时,添加用户id和客户端session,保存起来
*/
public void addOnlineUser(Integer userId, Session session)
clients.put(userId,session);
/**
* 关闭websocket连接,和出错时,删除客户端session
*/
public void delOnlineUser(Integer userId)
clients.remove(userId);
/**
* 接收到某用户的消息时,转发到所有客户端
* 存在一个消息转发所有客户端,存在新能问题
* 如果接收到的信息数量m很多,同时在线的用户数量n也很多,
* 那么要转发的次数就是m*n次,每个接收消息都是一个线程,
* 都要等待websocket中的onmessage回调方法执行完,性能差
* 优化(使用阻塞队列的方式解决:并行并发的执行任务提交和执行任务)
*/
public static void sendMessage(String message)
try
//遍历map的所有数据,然后转发到所有session对象中
Enumeration<Session> e = clients.elements();
while (e.hasMoreElements())
Session session = e.nextElement();
session.getBasicRemote().sendText(message);
catch (IOException ioException)
ioException.printStackTrace();
服务器webSocket
package org.example.servlet;
import org.example.dao.MessageDao;
import org.example.dao.UserDao;
import org.example.model.Message;
import org.example.model.MessageCenter;
import org.example.util.Util;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
/**
* Created with IntelliJ IDEA.
* Description:服务器websocket
* User: starry
* Date: 2021 -05 -27
* Time: 8:54
*/
@ServerEndpoint("/message/userId")
public class MessageWebsocket
@OnOpen
public void onOpen(@PathParam("userId") Integer userId, Session session) throws IOException
// 1.把每个客户端的session都保存起来,之后转发消息到所有客户端要用
// MessageCenter.addOnlineUser(userId,session);
MessageCenter.getInstance().addOnlineUser(userId,session);
// 2.查询本客户端(用户)上次登录前的消息(数据库查)
List<Message> list = MessageDao.queryByLastLogout(userId);
// 3.发送当前用户在上次登录后的消息
for (Message m : list)
session.getBasicRemote().sendText(Util.serialize(m));
System.out.println("建立连接" + userId);
@OnMessage
public void onMessage(Session session, String message)
// 1.遍历保存的所有session,每个都发送消息
// MessageCenter.sendMessage(message);
MessageCenter.getInstance().addMessage(message);
// 2.消息还要保存在数据库,
// (1)反序列化json字符串为message对象
Message msg = Util.deserialize(message, Message.class);
// (2)message设置接收消息的时间
// msg.setSendTime(new Date());
// (3)插入数据库
int n = MessageDao.insert(msg);
System.out.println("接收到的消息:" + message);
@OnClose
public void onClose(@PathParam("userId") Integer userId)
//1.本客户端关闭连接,要在之前保存的session集合中,删除
// MessageCenter.delOnlineUser(userId);
MessageCenter.getInstance().delOnlineUser(userId);
//2.建立连接要获取用户上次登录以后的消息,所以关闭长连接就是代表用户退出
//更新用户的上次登录时间
int n = UserDao.updateLastLogout(userId);
System.out.println("关闭连接");
@OnError
public void onError(@PathParam("userId") Integer userId, Throwable throwable)
System.out.println("出错了");
//和关闭连接的操作一样
// MessageCenter.delOnlineUser(userId);
MessageCenter.getInstance().delOnlineUser(userId);
throwable.printStackTrace();
实现效果
每个客户端可以同时收发消息,并且用户极多,同时发消息也可以正常执行(阻塞式队列)
以上是关于Web在线聊天室(12) --- 收发消息(单例模式+阻塞式队列)的主要内容,如果未能解决你的问题,请参考以下文章