28.1 延续简介
延续是一种机制用来实现类似于Servlet 3.0异步功能的异步Servlet,但提供了一个简单易操作的接口。
28.1.1 为什么使用异步Servlets
异步servlet的概念往往与异步IO或NIO的使用产生混淆。但是异步Servlets 和异步IO还是有主要不同点:
- HTTP请求通常很小并且位于一个单独的包,Servlets 很少在请求时阻塞。
- 许多responses 通常很小并且大小适合server缓冲,所以servlets 通常不会再写入response时堵塞
- 即便我们能在servlet中使用异步IO,那也将时编程变得更加困难。例如当一个应用程序读到2到3个字节的UTF-8它会怎么做?它不得不缓冲等待更多的字节。这件事最好由容器来做而不是应用程序。
异步servlets 的主要用法是用来等待非IO的事件或资源。许多web应用程序需要等待处理HTTP请求的各种阶段,例如:
- 处理请求前等待资源可用(例如:thread、JDBC连接)
- 在AJAX Comet应用中等待一个应用程序的事件(例如:聊天消息、价格变动)
- 等待远程服务的一个响应(例如:RESTful 、SOAP )
servlet API(2.5之前)仅支持一种同步调用方式,所以servlet 的任何等待都是阻塞式的。不幸的是,这意味着分配给请求的线程将会在等待所有资源的时候被持有:内核线程、栈存储、缓冲池、字符转换器、EE认真context等等。保存对这些资源的等待会浪费大量的系统资源。如果等待是异步进行的,那么可以进行更好的扩展和提高服务质量。
28.1.2 异步Servlets 例子
Web 2.0可以使用comet技术(又叫做 AJAX推送、服务端推送、长轮询)动态更新一个页面,而不需要刷新整个页面。
想获得更多关于comet 的知识,可以阅读comet 项目的与Jetty异步工作章节。
异步RESTful Web Service
假设一个web应用访问一个远程web服务(例如,SOAP service或RESTful service),通常一个远程服务仅花几百毫秒便可产生一个响应,eBay的RESTful web服务通常需要350ms来匹配给定关键字的拍卖列表-虽然仅仅只有少数的10ms的CPU时间来处理本地请求和生成一个响应。
为了每秒处理1000个请求,每一个web服务调用需要200ms,那么一个web应用需要1000*(200+20)/1000 = 220 个线程和110MB内存。如果发生请求风暴时仍会导致线程不足和web服务变慢的情况。如果进行异步处理,那么web应用将不需要持有每一个线程在等待web service响应的时候。及时异步机制需要消耗10ms(通常不消耗),那么web应用将需要1000*(20+10)/1000 = 30 个线程和15MB内存。这将有86%的性能提升,和95MB的内存释放可用于其他地方。而且,如果多重web services请求需要,异步调用将允许并行调用,而不是顺序调用,不需要额外分配线程。
假设一个web应用每秒处理400个应用请求,每个请求需要与数据库交互50ms。为了处理这些请求,平均每秒需要400*50/1000 = 20 个JDBC连接。然而请求经常爆发或者延迟。为了保护访问风暴下的数据库,通常使用数据库连接池来限制连接。所以对于这个应用程序,它将是合理应用JDBC 30连接池,提供50%的额外保证。
28.1.3 Servlet线程模型
Java servlet的可伸缩性能的主要原因是由服务线程模型引起:
传统的Java IO模型,每个TCP/IP 连接与一个线程关联。如果你有一些非常活跃的线程,那么这个模型可以扩展到非常高的每秒请求数。
Java NIO库支持异步IO,这样线程就不需要分配到每个连接上,当连接闲置时(两次请求中间),那么连接将会添加到NIO选择集合,它允许一个线程扫描许多活动连接。只有当IO被检测到输入输出的时候线程才会被分配给它。然而servlet 2.5 API模型仍然需要将一个线程分配给一个请求持用的所有时间内。
Jetty的支持连接API(和 servlet 3.0 异步)在servlet API引入一种改变,就是允许将一个请求多次分配到一个servlet。如果这个servlet没有所需的资源,那么这个servlet将被挂起(或将它放入异步模型),那么这个servlet将会没有任何响应的被返回。当等待的资源可用时,请求将会重新分配到这个servlet,使用一个新的线程,一个响应就会产生。
28.2 使用延续
异步servlet最初是由Jetty的延续机制引进来的,这是Jetty的一个特殊的机制。Jetty7以后,延续的API已经被扩展成一个通用的API,将在任何servlet-3.0容器上进行异步工作,Jetty6,7,8同样支持。延续机制还可以在servlet 2.5 容器下以阻塞方式运行。
28.2.1 获得一个延续
Continuation continuation = ContinuationSupport.getContinuation(request);
28.2.2 挂起一个请求
void doGet(HttpServletRequest request, HttpServletResponse response) { ... // 可选择的: // continuation.setTimeout(long); continuation.suspend(); ... }
请求的生命周期将会从Servlet.service(...) 和Filter.doFilter(...) 的调用延续到容器的返回。当这些调用方法返回时,挂起的请求将不会被提交响应也不会被发送到客户端。
挂起类似于servlet 3.0 的 request.startAsync()方法。不像jetty 6的延续,异常不会被抛出并且方法会正常返回。这允许注册的延续在挂起后发生避免发生互斥。如果一个需要一个异常(不知道延续而绕过代码并试图提交响应),那么continuation.undispatch() 方法将会被调用退出当前线程,并抛出一个ContinuationThrowable异常。
28.2.3 恢复一个请求
void myAsyncCallback(Object results) { continuation.setAttribute("results",results); continuation.resume(); }
当延续被恢复,请求会被重新分配到这个servlet 容器,就像请求重新来过一样。然而在重新分配过程中, continuation.isInitial()方法返回false,所有设置到异步处理器上的参数都是有效的。
延续的恢复类似于Servlet 3.0 的 AsyncContext.dispatch()方法。
28.2.4 完成一个请求
void myAsyncCallback(Object results) { writeResults(continuation.getServletResponse(),results); continuation.complete(); }
当完成方法被调用,容器会安排响应提交并刷新。延续的完成类似于Servlet 3.0 的 AsyncContext.complete()。
28.2.5 延续的监听
void doGet(HttpServletRequest request, HttpServletResponse response) { ... Continuation continuation = ContinuationSupport.getContinuation(request); continuation.addContinuationListener(new ContinuationListener() { public void onTimeout(Continuation continuation) { ... } public void onComplete(Continuation continuation) { ... } }); continuation.suspend(); ... }
延续的监听类似于Servlet 3.0 的 AsyncListeners。
28.3 常用延续模式
28.3.1 暂停恢复模式
暂停/恢复模式用在当一个servlet 或 一个filter用来产生一个响应,在被异步处理器中断并恢复后。一般请求的属性用来传递结果,用来表明请求已经被暂停。
void doGet(HttpServletRequest request, HttpServletResponse response) { // 如果我们需要获得异步的结果 Object results = request.getAttribute("results"); if (results==null) { final Continuation continuation = ContinuationSupport.getContinuation(request); // 如果没有超时 if (continuation.isExpired()) { sendMyTimeoutResponse(response); return; } // 恢复request continuation.suspend(); // 注册前一直被暂停 // 被异步服务注册,此处的代码将被服务调用 myAsyncHandler.register(new MyHandler() { public void onMyEvent(Object result) { continuation.setAttribute("results",results); continuation.resume(); } }); return; // 或者continuation.undispatch(); } // 发送结果 sendMyResultResponse(response,results); }
28.3.2 暂停继续模式
void doGet(HttpServletRequest request, HttpServletResponse response) { final Continuation continuation = ContinuationSupport.getContinuation(request); // 如果没有超时 if (continuation.isExpired()) { sendMyTimeoutResponse(request,response); return; } // 将请求挂起 continuation.suspend(); // response 可能被包装 // r注册给异步服务,代码将被服务调用 myAsyncHandler.register(new MyHandler() { public void onMyEvent(Object result) { sendMyResultResponse(continuation.getServletResponse(),results); continuation.complete(); } }); }
28.3.3 示例

// // ======================================================================== // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. // // The Eclipse Public License is available at // // // The Apache License v2.0 is available at // // // You may elect to redistribute this code under either of these licenses. // ======================================================================== // package com.acme; import; import; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Queue; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.continuation.ContinuationSupport; // 简单的异步聊天室 // 这不处理重复的用户名和同一个浏览器的标签情况 // 一些代码是重复的 public class ChatServlet extends HttpServlet { // 内部类用来保存每个成员的消息队列 class Member { String _name; Continuation _continuation; Queue<String> _queue = new LinkedList<String>(); } Map<String,Map<String,Member>> _rooms = new HashMap<String,Map<String, Member>>(); // 处理浏览器的AJAX调用 @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // Ajax调用的编码形式 String action = request.getParameter("action"); String message = request.getParameter("message"); String username = request.getParameter("user"); if (action.equals("join")) join(request,response,username); else if (action.equals("poll")) poll(request,response,username); else if (action.equals("chat")) chat(request,response,username,message); } private synchronized void join(HttpServletRequest request,HttpServletResponse response,String username) throws IOException { Member member = new Member(); member._name=username; Map<String,Member> room=_rooms.get(request.getPathInfo()); if (room==null) { room=new HashMap<String,Member>(); _rooms.put(request.getPathInfo(),room); } room.put(username,member); response.setContentType("text/json;charset=utf-8"); PrintWriter out=response.getWriter(); out.print("{action:\\"join\\"}"); } private synchronized void poll(HttpServletRequest request,HttpServletResponse response,String username) throws IOException { Map<String,Member> room=_rooms.get(request.getPathInfo()); if (room==null) { response.sendError(503); return; } Member member = room.get(username); if (member==null) { response.sendError(503); return; } synchronized(member) { if (member._queue.size()>0) { // 发送一个聊天消息 response.setContentType("text/json;charset=utf-8"); StringBuilder buf=new StringBuilder(); buf.append("{\\"action\\":\\"poll\\","); buf.append("\\"from\\":\\""); buf.append(member._queue.poll()); buf.append("\\","); String message = member._queue.poll(); int quote=message.indexOf(\'"\'); while (quote>=0) { message=message.substring(0,quote)+\'\\\\\'+message.substring(quote); quote=message.indexOf(\'"\',quote+2); } buf.append("\\"chat\\":\\""); buf.append(message); buf.append("\\"}"); byte[] bytes = buf.toString().getBytes("utf-8"); response.setContentLength(bytes.length); response.getOutputStream().write(bytes); } else { Continuation continuation = ContinuationSupport.getContinuation(request); if (continuation.isInitial()) { // 没有消息时,挂起等待聊天或超时 continuation.setTimeout(20000); continuation.suspend(); member._continuation=continuation; } else { // 超时后发送空的响应 response.setContentType("text/json;charset=utf-8"); PrintWriter out=response.getWriter(); out.print("{action:\\"poll\\"}"); } } } } private synchronized void chat(HttpServletRequest request,HttpServletResponse response,String username,String message) throws IOException { Map<String,Member> room=_rooms.get(request.getPathInfo()); if (room!=null) { // 推送消息到所有成员 for (Member m:room.values()) { synchronized (m) { m._queue.add(username); // from m._queue.add(message); // chat // 如果轮询到则唤醒 if (m._continuation!=null) { m._continuation.resume(); m._continuation=null; } } } } response.setContentType("text/json;charset=utf-8"); PrintWriter out=response.getWriter(); out.print("{action:\\"chat\\"}"); } // 提供嵌入css和js的html服务 // 这应该是静态内容和真正使用的js库 @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { if (request.getParameter("action")!=null) doPost(request,response); else getServletContext().getNamedDispatcher("default").forward(request,response); } }
这个ChatServlet 例子,展示了挂起/恢复模式被用来建立一个聊天室(使用了异步servlet)。相同的原则将应用于cometd这样的框架,为这样的应用提供一个基于延续的丰富的环境。

// // ======================================================================== // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. // // The Eclipse Public License is available at // // // The Apache License v2.0 is available at // // // You may elect to redistribute this code under either of these licenses. // ======================================================================== // package org.eclipse.jetty.servlets; import; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; /** * Quality of Service Filter. * <p> * This filter limits the number of active requests to the number set by the "maxRequests" init parameter (default 10). * If more requests are received, they are suspended and placed on priority queues. Priorities are determined by * the {@link #getPriority(ServletRequest)} method and are a value between 0 and the value given by the "maxPriority" * init parameter (default 10), with higher values having higher priority. * <p> * This filter is ideal to prevent wasting threads waiting for slow/limited * resources such as a JDBC connection pool. It avoids the situation where all of a * containers thread pool may be consumed blocking on such a slow resource. * By limiting the number of active threads, a smaller thread pool may be used as * the threads are not wasted waiting. Thus more memory may be available for use by * the active threads. * <p> * Furthermore, this filter uses a priority when resuming waiting requests. So that if * a container is under load, and there are many requests waiting for resources, * the {@link #getPriority(ServletRequest)} method is used, so that more important * requests are serviced first. For example, this filter could be deployed with a * maxRequest limit slightly smaller than the containers thread pool and a high priority * allocated to admin users. Thus regardless of load, admin users would always be * able to access the web application. * <p> * The maxRequest limit is policed by a {@link Semaphore} and the filter will wait a short while attempting to acquire * the semaphore. This wait is controlled by the "waitMs" init parameter and allows the expense of a suspend to be * avoided if the semaphore is shortly available. If the semaphore cannot be obtained, the request will be suspended * for the default suspend period of the container or the valued set as the "suspendMs" init parameter. * <p> * If the "managedAttr" init parameter is set to true, then this servlet is set as a {@link ServletContext} attribute with the * filter name as the attribute name. This allows context external mechanism (eg JMX via {@link ContextHandler#MANAGED_ATTRIBUTES}) to * manage the configuration of the filter. */ @ManagedObject("Quality of Service Filter") public class QoSFilter implements Filter { private static final Logger LOG = Log.getLogger(QoSFilter.class); static final int __DEFAULT_MAX_PRIORITY = 10; static final int __DEFAULT_PASSES = 10; static final int __DEFAULT_WAIT_MS = 50; static final long __DEFAULT_TIMEOUT_MS = -1; static final String MANAGED_ATTR_INIT_PARAM = "managedAttr"; static final String MAX_REQUESTS_INIT_PARAM = "maxRequests"; static final String MAX_PRIORITY_INIT_PARAM = "maxPriority"; static final String MAX_WAIT_INIT_PARAM = "waitMs"; static final String SUSPEND_INIT_PARAM = "suspendMs"; private final String _suspended = "QoSFilter@" + Integer.toHexString(hashCode()) + ".SUSPENDED"; private final String _resumed = "QoSFilter@" + Integer.toHexString(hashCode()) + ".RESUMED"; private long _waitMs; private long _suspendMs; private int _maxRequests; private Semaphore _passes; private Queue<AsyncContext>[] _queues; private AsyncListener[] _listeners; public void init(FilterConfig filterConfig) { int max_priority = __DEFAULT_MAX_PRIORITY; if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM) != null) max_priority = Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)); _queues = new Queue[max_priority + 1]; _listeners = new AsyncListener[_queues.length]; for (int p = 0; p < _queues.length; ++p) { _queues[p] = new ConcurrentLinkedQueue<>(); _listeners[p] = new QoSAsyncListener(p); } int maxRequests = __DEFAULT_PASSES; if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM) != null) maxRequests = Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)); _passes = new Semaphore(maxRequests, true); _maxRequests = maxRequests; long wait = __DEFAULT_WAIT_MS; if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM) != null) wait = Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)); _waitMs = wait; long suspend = __DEFAULT_TIMEOUT_MS; if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM) != null) suspend = Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM)); _suspendMs = suspend; ServletContext context = filterConfig.getServletContext(); if (context != null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM))) context.setAttribute(filterConfig.getFilterName(), this); } public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { boolean accepted = false; try { Boolean suspended = (Boolean)request.getAttribute(_suspended); if (suspended == null) { accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS); if (accepted) { request.setAttribute(_suspended, Boolean.FALSE); if (LOG.isDebugEnabled()) LOG.debug("Accepted {}", request); } else { request.setAttribute(_suspended, Boolean.TRUE); int priority = getPriority(request); AsyncContext asyncContext = request.startAsync(); long suspendMs = getSuspendMs(); if (suspendMs > 0) asyncContext.setTimeout(suspendMs); asyncContext.addListener(_listeners[priority]); _queues[priority].add(asyncContext); if (LOG.isDebugEnabled()) LOG.debug("Suspended {}", request); return; } } else { if (suspended) { request.setAttribute(_suspended, Boolean.FALSE); Boolean resumed = (Boolean)request.getAttribute(_resumed); if (resumed == Boolean.TRUE) { _passes.acquire(); accepted = true; if (LOG.isDebugEnabled()) LOG.debug("Resumed {}", request); } else { // Timeout! try 1 more time. accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS); if (LOG.isDebugEnabled()) LOG.debug("Timeout {}", request); } } else { // Pass through resume of previously accepted request. _passes.acquire(); accepted = true; if (LOG.isDebugEnabled()) LOG.debug("Passthrough {}", request); } } if (accepted) { chain.doFilter(request, response); } else { if (LOG.isDebugEnabled()) LOG.debug("Rejected {}", request); ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); } } catch (InterruptedException e) { ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); } finally { if (accepted) { for (int p = _queues.length - 1; p >= 0; --p) { AsyncContext asyncContext = _queues[p].poll(); if (asyncContext != null) { ServletRequest candidate = asyncContext.getRequest(); Boolean suspended = (Boolean)candidate.getAttribute(_suspended); if (suspended == Boolean.TRUE) { candidate.setAttribute(_resumed, Boolean.TRUE); asyncContext.dispatch(); break; } } } _passes.release(); } } } /** * Computes the request priority. * <p> * The default implementation assigns the following priorities: * <ul> * <li> 2 - for an authenticated request * <li> 1 - for a request with valid / non new session * <li> 0 - for all other requests. * </ul> * This method may be overridden to provide application specific priorities. * * @param request the incoming request * @return the computed request priority */ protected int getPriority(ServletRequest request) { HttpServletRequest baseRequest = (HttpServletRequest)request; if (baseRequest.getUserPrincipal() != null) { return 2; } else { HttpSession session = baseRequest.getSession(false); if (session != null && !session.isNew()) return 1; else return 0; } } public void destroy() { } /** * Get the (short) amount of time (in milliseconds) that the filter would wait * for the semaphore to become available before suspending a request. * * @return wait time (in milliseconds) */ @ManagedAttribute("(short) amount of time filter will wait before suspending request (in ms)") public long getWaitMs() { return _waitMs; } /** * Set the (short) amount of time (in milliseconds) that the filter would wait * for the semaphore to become available before suspending a request. * *以上是关于Jetty使用教程(四:28-30)—Jetty开发指南的主要内容,如果未能解决你的问题,请参考以下文章