学习Pushlet:pushlet多线程实现

Posted liuyuan1227

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学习Pushlet:pushlet多线程实现相关的知识,希望对你有一定的参考价值。

使用多线程的时候,就需要我们前面看的源码怎么发送请求,和后台的serlet的代码,因为需要自己重新手写实现我们的需求。

首先整体思路是发送一个请求到我们自己的serlet,然后不同的用户订阅不同的事件(userId),然后每个线程管理自己的事件和session,当浏览器关闭刷新的时候取消订阅,然后关闭线程,但是取消订阅的时候session是不会立刻关闭的,有一个默认事件一分钟自己关闭,如果想要关闭session的话,需要用id,所以我自己传递了sessionid,以便于我关闭。

jsp中的方法(pushlet本身就是用XMLHttpRequest发送的)

  

 var subscriptionId = null;   
onInit(); window.onbeforeunload
= onUnsubscribe; // window.onunload = onUnsubscribe(); // 获取长度为len的随机字符串 function _getRandomString(len) { var len = len || 32; var chars = ‘ABCDEFGHJKMNPQRSTWXYZabcdefhijkmnprstwxyz‘; var maxPos = chars.length; var pwd = ‘‘; for (i = 0; i < len; i++) { pwd += chars.charAt(Math.floor(Math.random() * maxPos)); } return pwd; } // 页面加载完,初始化请求、监听 function onInit() { PL.mySessionId = _getRandomString(32); var userId = ‘<ww:property value="#session.sUser.realName" />‘;//我的项目中的获取userid的方法 var aSubject = userId; //主题名 var httpRequest = getXMLHttpRequest(); if (httpRequest) { httpRequest.onreadystatechange = function() { if (httpRequest.readyState == 4) { if (httpRequest.status == 200) { // 请求成功,起pushlet监听 PL._init(); PL.joinListen(PL.mySessionId ); } else { alert("实时请求失败!\n" + httpRequest.statusText); } } } url = ‘<%=request.getContextPath()%>‘ + ‘/TestServlet‘ + ‘?subject=‘ + aSubject + ‘&thisSessionId=‘ + PL.mySessionId ; httpRequest.open("POST", url, true); httpRequest.send(null); } } // 监听后台返回的数据信息,更新页面 function onData(event) { // 保存订阅编号,用于页面关闭时进行退订 subscriptionId = event.get(‘p_sid‘); console.log(subscriptionId+"=============================================") console.log(event.get("who") +" "+ event.get("num") +" "+ event.get("date")); } // 页面关闭时,取消订阅 function onUnsubscribe() { if (subscriptionId != null) { PL.unsubscribe(subscriptionId); } else {
//什么时候需要这个方法呢,就是比如我设置了五秒一刷新,user登录了不足五秒马上退出,此时线程还在休眠中,根本还没有生成事件,还没有给前台subscriptionid不能取消订阅,此时直接关闭session $.ajax({ type:
‘POST‘, async:false, url:‘personal/delPushLetSession.action?id=‘+ PL.mySessionId, dataType:"json", cache: false, success : function(data) { }, }); } } // 获取http请求 function getXMLHttpRequest() { req = false; //本地XMLHttpRequest对象 if (window.XMLHttpRequest) { try { req = new XMLHttpRequest(); } catch (e) { req = false; } //IE/Windows ActiveX版本 } else if (window.ActiveXObject) { try { req = new ActiveXObject("Msxml2.XMLHTTP"); } catch (e) { try { req = new ActiveXObject("Microsoft.XMLHTTP"); } catch (e) { req = false; } } } return req; }

js中常量和_doRequest方法中分别添加了:

 mySessionId: null, 

 

// Construct base URL for GET
var url = PL.pushletURL + ‘?p_event=‘ + anEvent;
if (anEvent == ‘join‘ || anEvent == ‘join-listen‘) {
url = url + "&mySessionId=" + PL.mySessionId;
}

新建一个serlet类:

package com.css.app.personalmsg.server;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

public class TestServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;
    public TestServlet2() {
        super();
    }
    /**
     * get/post方法的处理函数
     */
    protected void service(HttpServletRequest request,
                           HttpServletResponse response) throws ServletException, IOException {

        // 读取请求报文数据
        request.setCharacterEncoding("UTF-8");
        // 获取订阅主题名称
        String aSubject = request.getParameter("subject");
        String thisSessionId = request.getParameter("thisSessionId");
        // 启动一个线程,实现创建Pushlet事件、做业务、向前台推送数据等功能
        PushThread2 pushThread = new PushThread2(aSubject, thisSessionId);
        pushThread.start();
    }
}

新建一个线程类:

package com.css.app.personalmsg.server;

import nl.justobjects.pushlet.core.Dispatcher;
import nl.justobjects.pushlet.core.Event;
import nl.justobjects.pushlet.core.Session;
import nl.justobjects.pushlet.core.SessionManager;

import java.util.Date;

public class PushThread extends Thread {
    // 主题
    public String aSubject; // 客户端传递过来
    // sessionId
    public String thisSessionId;

    /**
     * 构造函数
     *
     * @param aSubject
     */
    public PushThread2(String aSubject, String thisSessionId) {
        this.aSubject = aSubject;
        this.thisSessionId = thisSessionId;
    }
    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // 线程阻塞,结束线程
                System.out.println("=========>sleep异常 --->" + "线程" + Thread.currentThread().getId() + "关闭");
                break;
            }
            Event event = Event.createDataEvent(thisSessionId);
            System.out.println("\n-----Thread ID: " + Thread.currentThread().getId());
            // 判断当前连接的会话个数,没有会话,则线程退出
            Session session = SessionManager.getInstance().getSession(thisSessionId);
            // 当前无会话,结束线程
            if (session == null) {
                System.out.println("=========>无sessions --->" + "线程" + Thread.currentThread().getId() + "关闭");
                break;
            }
            // 判断当前会话中是否订阅该主题,没有订阅则结束线程
            if (null == session.getSubscriber().match(event)) {
                session.stop();
                System.out.println(thisSessionId+"没有订阅 --->" + "线程"    + Thread.currentThread().getId() + "关闭");
                break;
            }
            // 模拟业务处理:获取各测点的值
           // GetPersonalMsgService msgService = new GetPersonalMsgService(aSubject);
            //int num = msgService.getmsg();
            int num = 2;
           /* str = "传递中文测试!!";*/
           /* try {
                str = URLEncoder.encode(str,"UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }*/
            event.setField("num", num);
            event.setField("who",aSubject);
            event.setField("date",new Date().toString());
            // 推送消息
            Dispatcher.getInstance().multicast(event); //
            System.out.println(new Date()+"发送了消息");

        }
    }
}

最后在webxml中添加

<servlet>
    <servlet-name>myPushlet</servlet-name>
    <servlet-class>com.css.app.personalmsg.server.TestServlet</servlet-class>
    <load-on-startup>3</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>myPushlet</servlet-name>
    <url-pattern>/TestServlet</url-pattern>
  </servlet-mapping>
PushletSessionManager中改为:
public class PushletSessionManager extends SessionManager {

    @Override
    public Session createSession(Event anEvent) throws PushletException {
        //Event的getField方法的第二个参数为当传递参数中不存在第一个参数字段时默认使用的值。
        return Session.create(anEvent.getField("mySessionId", "visitor"));
    }
}

最后别忘了在pushlet.properties中找到sessionmanager.class改为我们写的类的路径:

sessionmanager.class=com.css.app.personalmsg.server.PushletSessionManager

大功告成。这样每登录一个用户就会启动一个线程了生成用户id主题的事件,用户退出线程关闭,多用户之间互不影响。有兴趣的自己加入线程池吧。

项目上传github上面了:

 

以上是关于学习Pushlet:pushlet多线程实现的主要内容,如果未能解决你的问题,请参考以下文章

pushlet

Pushlet浏览器长连接通讯

使用Pushlet将消息从服务器端推送到客户端

Pushlet、长轮询或轮询 - 我应该在我的聊天应用程序中使用哪一个?

基于 Pushlets 的消息推送设

WebSocket与消息推送