WebSocket Session共享

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WebSocket Session共享相关的知识,希望对你有一定的参考价值。

参考技术A 最近在做消息中心模块,想要实现消息实时推送到前端页面展示,直接摒弃了前端定时轮训调用接口来获取消息数据的方式,采用了WebSocket服务端推送。

流程是首先前端跟后端应用新建一个连接,并携带当前登录的用户ID,此时WebSocket会创建一个WebsocketSession来唯一绑定该连接,我们会在后端用Map建立用户ID与Session的映射关系:

后续有新消息到达时,就可以通过该Map映射找到指定用户ID对应的session来推送消息。但有一个问题,后端是多应用节点,每个节点维护一个这样的Map, 无法共享WebsocketSession ,而且 redis也无法对WebsocketSession序列化后进行存储 。

由于项目目前用到了Redis,所以可以 采用Redis的发布/订阅功能来实现WebsocketSession共享问题。

1.新建一个对象,属性有userId, message,用于发送消息

2. 当新消息到达时,将消息注册到redis指定topic的频道上

3.每个应用节点都订阅该topic的频道,这样新消息一注册,每个节点都能接收到Object,然后从Object中获取userId,再从映射Map中获取userId对应的WebsocketSession(在哪个节点建立的连接和Map映射关系,就会在哪个节点找到对应的session),进行消息推送。

就这样通过Redis的发布/订阅功能实现session共享。当然在步骤2,新消息到达时,可以先在本节点的Map映射中查找是否有userId对应的session,如果有,直接推送消息,而且不必要再将消息注册到redis中。

基于tomcat 7.0.68 的websocket 实现,及通过 HttpSessionId 实现websocket session 共享

0、先把页面调用代码贴出来好了

var historyWebsocket = null;
//  var websocketOnline = null;

      //判断当前浏览器是否支持WebSocket
    if('WebSocket' in window)
        historyWebsocket = new WebSocket("ws://ip:端口/应用名/historyWebsocket");
    
    else
        alert('Not support websocket')
    

  //连接发生错误的回调方法
    historyWebsocket.onerror = function()
        setMessageInnerHTML("error");
    ;

    //连接成功建立的回调方法
    historyWebsocket.onopen = function(event)
        send();
    
 //接收到消息的回调方法
    historyWebsocket.onmessage = function(event)

        var data = eval("("+event.data+")");//初始化数据
    

    //连接关闭的回调方法
    historyWebsocket.onclose = function()
        setMessageInnerHTML("close");
    


    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    window.onbeforeunload = function()
        historyWebsocket.close();
    

    //将消息显示在网页上
    function setMessageInnerHTML(innerHTML)
        $("#message").empty();
        document.getElementById('message').innerHTML +='<font color="#00CACA"class="message">' +innerHTML + '</font><br/>';
    

    //关闭连接
    function closeWebSocket()
        historyWebsocket.close();
    
    //发送消息
    function send()
//         var message = document.getElementById('text').value;
//         mesg="14701661345";

//          historyWebsocket.send(message);
    

1、onlineWebsocket.java

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;





//该注解用来指定一个URI,客户端可以通过这个URI来连接到WebSocket。类似Servlet的注解mapping。无需在web.xml中配置。
@ServerEndpoint(value="/onlineWebsocket",configurator=GetHttpSessionConfigurator.class)
public class OnlineStateWebSocket 
    private static final Logger LOG = LoggerFactory.getLogger(OnlineStateWebSocket.class);

    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
    private static CopyOnWriteArraySet<OnlineStateWebSocket> webSocketSet = new CopyOnWriteArraySet<OnlineStateWebSocket>();

    //用来存放 sim 卡号对应的 连接 session 的map  ,key 为sim 卡号,value 为各个终端的session 对象
    private static ConcurrentHashMap<String,ArrayList<OnlineStateWebSocket>> simSessionWebSocketMap = new ConcurrentHashMap<String,ArrayList<OnlineStateWebSocket>>();

    //用来存放 httpSessionId/session 的map  ,key 为httpSessionId ,value 为各个终端的session 对象
    private static ConcurrentHashMap<String,OnlineStateWebSocket> httpSessionWebSocketMap = new ConcurrentHashMap<String,OnlineStateWebSocket>();


    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    private HttpSession httpSession;

    /**
     * 连接建立成功调用的方法
     * @param session  可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session, EndpointConfig config)
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1

        this.httpSession = (HttpSession) config.getUserProperties()
                .get(HttpSession.class.getName());
        //用于共享 httpSession
        httpSessionWebSocketMap.put(this.httpSession.getId(), this);


        System.out.println("实时数据:有新连接加入!当前在线人数为" + getOnlineCount());
    

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose()
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1    

        Set<Entry<String,ArrayList<OnlineStateWebSocket>>> entrySet = simSessionWebSocketMap.entrySet();

        Iterator<Entry<String, ArrayList<OnlineStateWebSocket>>> iterator = entrySet.iterator();
        while(iterator.hasNext())

            Entry<String, ArrayList<OnlineStateWebSocket>> next = iterator.next();

            String key = next.getKey();    //获取sim卡号

            ArrayList<OnlineStateWebSocket> value = next.getValue(); //获取sim卡号对应的 连接的websocket终端

            if(value.contains(this))
                if(value.size()>1)
                    value.remove(this);
                    simSessionWebSocketMap.replace(key, value);
                else
                    simSessionWebSocketMap.remove(key);
                

            
        

        System.out.println("实时数据:有一连接关闭!当前在线人数为" + getOnlineCount());
    

    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message, Session session) 
        System.out.println("实时数据:来自客户端的消息:" + message);

        String[] SimArray = message.split(",");

        this.session = session;
        webSocketSet.add(this);     //加入set中

        for(String sim:SimArray)

             ArrayList<OnlineStateWebSocket> vehicleClientList ;

             if(simSessionWebSocketMap.get(sim.trim()) != null)
                vehicleClientList=simSessionWebSocketMap.get(sim.trim());   //取出当前sim 卡号所对应的所有session

                vehicleClientList.add(this);
                simSessionWebSocketMap.replace(sim.trim(), vehicleClientList);
             else
                vehicleClientList= new ArrayList<OnlineStateWebSocket>();
                vehicleClientList.add(this);

                 simSessionWebSocketMap.put(sim.trim(), vehicleClientList);
             

        

    

    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error)
        System.out.println("实时数据:发生错误");
        error.printStackTrace();
    

    /**
     * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
     * @param message
     * @throws IOException
     */
    public   void sendMessage(String message,HttpServletRequest request) throws IOException
        if(this.session.isOpen())
            this.session.getBasicRemote().sendText(message);
            this.session.setMaxTextMessageBufferSize(102400);
//          this.session.getAsyncRemote().sendText(message);
        

    

    public   void sendMessage(String message) throws IOException

        if(this.session.isOpen())
            this.session.getBasicRemote().sendText(message);
            this.session.setMaxTextMessageBufferSize(102400);
//          this.session.getAsyncRemote().sendText(message);
        
    

    public static synchronized int getOnlineCount() 
        return onlineCount;
    

    public static synchronized void addOnlineCount() 
        OnlineStateWebSocket.onlineCount++;
    

    public static synchronized void subOnlineCount() 
        OnlineStateWebSocket.onlineCount--;
    

    public static CopyOnWriteArraySet<OnlineStateWebSocket> getWebSocketSet() 
        return webSocketSet;
    

    public static void setWebSocketSet(CopyOnWriteArraySet<OnlineStateWebSocket> webSocketSet) 
        OnlineStateWebSocket.webSocketSet = webSocketSet;
    



2、RealWebsocket.java


import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;





//该注解用来指定一个URI,客户端可以通过这个URI来连接到WebSocket。类似Servlet的注解mapping。无需在web.xml中配置。
@ServerEndpoint(value ="/realWebsocket",configurator=GetHttpSessionConfigurator.class)
public class RealWebSocket 
    private static final Logger LOG = LoggerFactory.getLogger(RealLocationWebSocket.class);


    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
    private static CopyOnWriteArraySet<RealLocationWebSocket> webSocketSet = new CopyOnWriteArraySet<RealLocationWebSocket>();

    //用来存放 sim 卡号对应的 连接 session 的map  ,key 为sim 卡号,value 为各个终端的session 对象
    private static ConcurrentHashMap<String,ArrayList<RealLocationWebSocket>> simSessionWebSocketMap = new ConcurrentHashMap<String,ArrayList<RealLocationWebSocket>>();

    //用来存放 httpSessionId/session 的map  ,key 为httpSessionId ,value 为各个终端的session 对象
    private static ConcurrentHashMap<String,RealLocationWebSocket> httpSessionWebSocketMap = new ConcurrentHashMap<String,RealLocationWebSocket>();


    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    private HttpSession httpSession;

    /**
     * 连接建立成功调用的方法
     * @param session  可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session, EndpointConfig config)
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1


        this.httpSession = (HttpSession) config.getUserProperties()
                .get(HttpSession.class.getName());
        //用于共享 httpSession
        httpSessionWebSocketMap.put(this.httpSession.getId(), this);

        System.out.println("实时数据:有新连接加入!当前在线人数为" + getOnlineCount());
    

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose()
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1    

        Set<Entry<String,ArrayList<RealLocationWebSocket>>> entrySet = simSessionWebSocketMap.entrySet();

        Iterator<Entry<String, ArrayList<RealLocationWebSocket>>> iterator = entrySet.iterator();
        while(iterator.hasNext())

            Entry<String, ArrayList<RealLocationWebSocket>> next = iterator.next();

            String key = next.getKey();    //获取sim卡号

            ArrayList<RealLocationWebSocket> value = next.getValue(); //获取sim卡号对应的 连接的websocket终端

            if(value.contains(this))
                if(value.size()>1)
                    value.remove(this);
                    simSessionWebSocketMap.replace(key, value);
                else
                    simSessionWebSocketMap.remove(key);
                

            
        


        System.out.println("实时数据:有一连接关闭!当前在线人数为" + getOnlineCount());
    

    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message, Session session) 
        System.out.println("实时数据:来自客户端的消息:" + message);


        String[] SimArray = message.split(",");

        this.session = session;
        webSocketSet.add(this);     //加入set中

        for(String sim:SimArray)

             ArrayList<RealLocationWebSocket> vehicleClientList ;

             if(simSessionWebSocketMap.get(sim.trim()) != null)
                vehicleClientList=simSessionWebSocketMap.get(sim.trim());   //取出当前sim 卡号所对应的所有session
                vehicleClientList.add(this);
                simSessionWebSocketMap.replace(sim.trim(), vehicleClientList);
             else
                vehicleClientList= new ArrayList<RealLocationWebSocket>();
                vehicleClientList.add(this);

                 simSessionWebSocketMap.put(sim.trim(), vehicleClientList);
             

        

    

    /**
     * 发生错误时调用
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error)
        System.out.println("实时数据:发生错误");
        error.printStackTrace();
    

    /**
     * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
     * @param message
     * @throws IOException
     */
    public   void sendMessage(String message,HttpServletRequest request) throws IOException
        if(this.session.isOpen())
            this.session.getBasicRemote().sendText(message);
            this.session.setMaxTextMessageBufferSize(102400);
//          this.session.getAsyncRemote().sendText(message);

        

    

    public   void sendMessage(String message) throws IOException

        if(this.session.isOpen())
            this.session.getBasicRemote().sendText(message);
            this.session.setMaxTextMessageBufferSize(102400);
//          this.session.getAsyncRemote().sendText(message);
        
    

    public static synchronized int getOnlineCount() 
        return onlineCount;
    

    public static synchronized void addOnlineCount() 
        RealLocationWebSocket.onlineCount++;
    

    public static synchronized void subOnlineCount() 
        RealLocationWebSocket.onlineCount--;
    

    public static CopyOnWriteArraySet<RealLocationWebSocket> getWebSocketSet() 
        return webSocketSet;
    

    public static void setWebSocketSet(CopyOnWriteArraySet<RealLocationWebSocket> webSocketSet) 
        RealLocationWebSocket.webSocketSet = webSocketSet;
    



3、最后就是根据httpSessionId 实现 websocket session 共享了

首先就是实现 在RealWebSocket.java 中和 OnlineWebsocket 中都出现了的 GetHttpSessionConfigurator.class

import java.util.ArrayList;
import java.util.List;

import javax.servlet.http.HttpSession;
import javax.websocket.Extension;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;

public class GetHttpSessionConfigurator extends ServerEndpointConfig.Configurator


    @Override
    public <T> T getEndpointInstance(Class<T> clazz)
            throws InstantiationException 
        try 
            return clazz.newInstance();
         catch (IllegalAccessException e) 
            InstantiationException ie = new InstantiationException();
            ie.initCause(e);
            throw ie;
        
    


    @Override
    public String getNegotiatedSubprotocol(List<String> supported,
            List<String> requested) 

        for (String request : requested) 
            if (supported.contains(request)) 
                return request;
            
        
        return "";
    


    @Override
    public List<Extension> getNegotiatedExtensions(List<Extension> installed,
            List<Extension> requested) 

        List<Extension> result = new ArrayList<>();
        for (Extension request : requested) 
            if (installed.contains(request)) 
                result.add(request);
            
        
        return result;
    


    @Override
    public boolean checkOrigin(String originHeaderValue) 
        return true;
    

    @Override
    public void modifyHandshake(ServerEndpointConfig config, 
                                HandshakeRequest request, 
                                HandshakeResponse response)
    
        HttpSession httpSession = (HttpSession)request.getHttpSession();
        config.getUserProperties().put(HttpSession.class.getName(),httpSession);
    

在上面这个过滤器里面,如果不出意外,你是会报一个空指针的,所以呢,在这里要修复一下

import javax.servlet.ServletRequestEvent;
import javax.servlet.ServletRequestListener;
import javax.servlet.annotation.WebListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

@WebListener
public class RequestListener implements ServletRequestListener 

    @Override
    public void requestDestroyed(ServletRequestEvent sre) 
        // TODO Auto-generated method stub

    

    @Override
    public void requestInitialized(ServletRequestEvent sre) 
        HttpSession session = ((HttpServletRequest) sre.getServletRequest()).getSession();
    


另外要将这个过滤器加到web.xml 中

    <listener>
        <listener-class>com.vshop.sys.listener.RequestListener</listener-class>
    </listener>

这样完了之后,应该就不会有什么问题了

就是通过 OnlineWebsocket.java 和 RealWebsocket .java中的httpSessionWebSocketMap 变量实现 两个 websocket Session 共享的

以上是关于WebSocket Session共享的主要内容,如果未能解决你的问题,请参考以下文章

基于tomcat 7.0.68 的websocket 实现,及通过 HttpSessionId 实现websocket session 共享

基于tomcat 7.0.68 的websocket 实现,及通过 HttpSessionId 实现websocket session 共享

基于tomcat 7.0.68 的websocket 实现,及通过 HttpSessionId 实现websocket session 共享

Redis+Tomcat+Nginx集群实现Session共享,Tomcat Session共享

Tomcat集群通过redis实现session共享

Redis解决websocket在分布式场景下session共享问题