使用jetty的continuations实现"服务器推"
Posted 梦中彩虹
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用jetty的continuations实现"服务器推"相关的知识,希望对你有一定的参考价值。
在实际的开发中,我们可能会有这样的场景:许多客户端都连接到服务器端,当有某个客户端的消息的时候,服务器端会主动"推"消息给客户端,手机app的推送是一个典型的场景(ios的推送都是要经过苹果的服务器的,一般是通过苹果的APNS服务来实现,不需要做过多的开发,安卓的推送就需要我们自己来实现了)
我们可选的技术方案实际上是很多的,使用netty这样的异步的网络通信框架或者servlet容器提供的异步的方案都是可以实现的,它们的理念都是一样的,异步和事件驱动,客户端请求服务器,当服务器没有需要推送的数据(或者是需要执行很长时间的IO操作)的时候,请求会被挂起,当服务器端的数据准备好的时候(例如需要向客户端推送一个消息的时候,或者是服务器端IO操作执行完毕了)请求会被重新激活,数据返回客户端.
使用jetty的continuations或者是netty来实现这两种是我觉得比较好的实现方案,今天介绍一下如何使用jetty的continuations来实现一个服务器推的原型,和正式环境中向安卓手机的推送的实现方法是完全一样的
continuations介绍:jetty的continuations是jetty实现的实现异步请求和事件驱动的组件,从jetty7起,continuations不止在jetty中可以使用,任何支持servlet3.0规范的servlet容器都可以使用continuations来实现异步和事件驱动,相比servlet3.0规范中的异步servlet,continuations提供了更加简化的编程模型.
目标:用浏览器请求服务器的一个URL(用浏览器来模拟我们的客户端),实现任何时候当服务器需要推送数据的时候,浏览器能够立即显示出来
我们需要提供两个接口:提供给客户端做长连接的接口,向客户端发送数据的接口
提供给客户端连接的servlet:
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/23 * Time: 23:52 * To change this template use File | Settings | Editor | File and Code Templates */ import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.continuation.ContinuationListener; import org.eclipse.jetty.continuation.ContinuationSupport; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.util.Map; @WebServlet(urlPatterns = "/pull", asyncSupported = true) public class ContinuationServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String user = req.getParameter("user"); Map<String, PushAgent> pushAgentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap"); if (pushAgentMap.containsKey(user)) { PushAgent pushAgent = pushAgentMap.get(user); Continuation continuation = ContinuationSupport.getContinuation(req); continuation.setTimeout(90000000); //第一次请求进来 if (continuation.isInitial()) { resp.setContentType("text/evf;charset=utf-8"); resp.setHeader("Connection", "keep-alive"); resp.setHeader("Keep-Alive", "timeout=2000"); PushAdapter pushAdapter = new PushAdapter(continuation, pushAgent); continuation.setAttribute("adapter", pushAdapter); continuation.addContinuationListener(new ContinuationListener() { @Override public void onComplete(Continuation continuation) { PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter"); if (null != adapter) { continuation.setAttribute("adapter", null); } } @Override public void onTimeout(Continuation continuation) { onComplete(continuation); } }); resp.flushBuffer(); } if (continuation.isExpired()) { return; } Writer writer = getWriter(resp); PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter"); Message message; while (true) { message = adapter.getPushAgent().pull(); if (null == message) break; try { writer.write(message.getContent()); writer.flush(); writer.write("\\r\\n"); writer.flush(); resp.flushBuffer(); } catch (Exception e) { throw e; } } //若没有该客户端的消息,则请求被挂起 continuation.suspend(); } } private Writer getWriter(HttpServletResponse response) throws IOException { OutputStream os = response.getOutputStream(); return new OutputStreamWriter(os, "UTF-8"); } }
向客户端推送消息的servlet:
package com.jiaoyiping.websample.asyncServlet.jetty; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; import java.util.Map; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 23:46 * To change this template use File | Settings | Editor | File and Code Templates */ @WebServlet(urlPatterns = "/send") public class MesssageSendServlet extends HttpServlet { @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { //不要在自己实现的servlet中调用 super.doGet(0)或者是super.doPost() //因为在tomcat它们的默认实现是报405(HTTP1.1)或者400(其他版本的HTTP) // super.doPost(req, resp); String target = req.getParameter("target"); String messageStr = req.getParameter("message"); Map<String, PushAgent> agentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap"); if (agentMap.keySet().contains(target)) { Message message = new Message(); message.setTarget(target); message.setContent(messageStr); if (agentMap.get(target).isInited()) { agentMap.get(target).onEvent(message); } agentMap.get(target).send(message); PrintWriter out = resp.getWriter(); out.print("发送成功"); out.flush(); out.close(); } } @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { this.doPost(req, resp); } }
推送代理:就是可以拿到客户端相关信息,并且维护客户端消息队列的类,在这个推送代理中,我们可以加入一个监听器,当有数据需要推送的时候,激活请求
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 22:56 * To change this template use File | Settings | Editor | File and Code Templates */ public interface PushAgent { Terminal getTerminal(); String getAddress(); String getToken(); Message send(Message message); Message pull(); void addListener(MessageListener messageListener); void onEvent(Message message); boolean isInited(); }
默认实现(每个需要接受推送的用户对应一个PushAgent,和用户端保持长连接的线程从queue里读取mesage对象,向某个用户推送的时候将message对象放到该用户对应的PushAgent的queue里,这里是一个生产者-消费者模式):
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 23:17 * To change this template use File | Settings | Editor | File and Code Templates */ import java.util.PriorityQueue; import java.util.Queue; public class DefaultPushAgent implements PushAgent { private Terminal terminal; //客户端通过长连接连接到服务器时,服务器不断地从该队列poll(),若果拿到新的消息,则返回给客户端 private Queue<Message> messages = new PriorityQueue<>(); private MessageListener messageListener; @Override public Terminal getTerminal() { return this.terminal; } @Override public String getAddress() { return null; } @Override public String getToken() { return null; } @Override public Message send(Message message) { synchronized (message) { messages.add(message); } return message; } @Override public Message pull() { synchronized (messages) { return messages.poll(); } } @Override public void addListener(MessageListener messageListener) { this.messageListener = messageListener; } @Override public void onEvent(Message message) { this.messageListener.onMessage(message); } @Override public boolean isInited() { return this.messageListener != null; } public DefaultPushAgent(Terminal terminal) { this.terminal = terminal; } }
PushAdapter的实现(用户将Continuation和PushAgent关联起来):
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 23:37 * To change this template use File | Settings | Editor | File and Code Templates */ import org.eclipse.jetty.continuation.Continuation; public class PushAdapter { private Continuation continuation; private PushAgent pushAgent; public PushAdapter(Continuation continuation, PushAgent pushAgent) { this.continuation = continuation; this.pushAgent = pushAgent; this.pushAgent.addListener(message -> { if (PushAdapter.this.continuation.isSuspended()) { PushAdapter.this.continuation.resume(); } }); } public Continuation getContinuation() { return continuation; } public void setContinuation(Continuation continuation) { this.continuation = continuation; } public PushAgent getPushAgent() { return pushAgent; } public void setPushAgent(PushAgent pushAgent) { this.pushAgent = pushAgent; } }
MessageListener的实现(监听需要推送消息的事件,这里为了做演示,并没有实现一个完整的观察者模式,只是在需要推送消息的时候,手工调用 onMessage()):
package com.jiaoyiping.websample.asyncServlet.jetty; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/26 * Time: 2:03 * To change this template use File | Settings | Editor | File and Code Templates */ public interface MessageListener { void onMessage(Message message); }
测试数据:使用一个listener在应用初始化的时候,初始化一些数据做为测试数据
package com.jiaoyiping.websample.asyncServlet.jetty; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; import java.util.HashMap; import java.util.Map; /* * Created with Intellij IDEA * USER: 焦一平 * Mail: jiaoyiping@gmail.com * Date: 2016/10/25 * Time: 23:55 * To change this template use File | Settings | Editor | File and Code Templates */ @WebListener public class PushListener implements ServletContextListener { @Override public void contextInitialized(ServletContextEvent sce) { Map<String, PushAgent> agentMap = new HashMap<>(); agentMap.put("zhangsan", new DefaultPushAgent(new Terminal() {{ setAddress("zhangsan"); setToken("zhangsan_token"); }})); agentMap.put("lisi", new DefaultPushAgent(new Terminal() {{ setAddress("lisi"); setToken("lisi_token"); }})); sce.getServletContext().setAttribute("agentmap",agentMap); } @Override public void contextDestroyed(ServletContextEvent sce) { } }
最终的效果是这样的,我截了一个git图:
以上是关于使用jetty的continuations实现"服务器推"的主要内容,如果未能解决你的问题,请参考以下文章
Python入门-4控制语句:08循环中的break-continue-else
Discrete VS Continuous Control
Discrete VS Continuous Control
centos7 安装后,意外出现Please make your choice from above ['q' to quit | 'c' to continue |