Jetty使用教程(四:28-30)—Jetty开发指南

Posted 已往之不谏

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Jetty使用教程(四:28-30)—Jetty开发指南相关的知识,希望对你有一定的参考价值。

二十八、延续机制支持

28.1 延续简介

  延续是一种机制用来实现类似于Servlet 3.0异步功能的异步Servlet,但提供了一个简单易操作的接口。

28.1.1 为什么使用异步Servlets 

不使用异步IO:

  异步servlet的概念往往与异步IO或NIO的使用产生混淆。但是异步Servlets 和异步IO还是有主要不同点:

    • HTTP请求通常很小并且位于一个单独的包,Servlets 很少在请求时阻塞。
    • 许多responses 通常很小并且大小适合server缓冲,所以servlets 通常不会再写入response时堵塞
    • 即便我们能在servlet中使用异步IO,那也将时编程变得更加困难。例如当一个应用程序读到2到3个字节的UTF-8它会怎么做?它不得不缓冲等待更多的字节。这件事最好由容器来做而不是应用程序。

异步等待:

  异步servlets 的主要用法是用来等待非IO的事件或资源。许多web应用程序需要等待处理HTTP请求的各种阶段,例如:

    • 处理请求前等待资源可用(例如:thread、JDBC连接)
    • 在AJAX Comet应用中等待一个应用程序的事件(例如:聊天消息、价格变动)
    • 等待远程服务的一个响应(例如:RESTful 、SOAP )

  servlet API(2.5之前)仅支持一种同步调用方式,所以servlet 的任何等待都是阻塞式的。不幸的是,这意味着分配给请求的线程将会在等待所有资源的时候被持有:内核线程、栈存储、缓冲池、字符转换器、EE认真context等等。保存对这些资源的等待会浪费大量的系统资源。如果等待是异步进行的,那么可以进行更好的扩展和提高服务质量。

28.1.2 异步Servlets 例子

AJAX服务端推送

  Web 2.0可以使用comet技术(又叫做 AJAX推送、服务端推送、长轮询)动态更新一个页面,而不需要刷新整个页面。

  考虑一个股票投资的web应用程序。每个浏览器都会发一个长轮询到服务器,要求服务器提供任何用户股票价格的变动。服务器将从它所有的客户端接收到长轮询请求,并且不会立即进行响应。服务器将等待股票价格的变动,在那时它将发送一个响应到所有等待着的客户端。客户端接收到长轮询响应后会立即再次发送一个长轮询来获得未来价格的改变。

  这样的话,服务器将持有每一个用户连接的长轮询,所以如果servlet不是异步的话,那么至少需要1000个线程来持有同时1000各用户。1000个线程会消耗256M资源;这些资源最好被应用所使用而不是进行无意义的等待。

  如果servlet是异步的话,那么需要的线程数量将由生成相应的时间和价格变化的频率所决定。如果每个用户每10秒接收一次请求,响应需要10ms来生成,那么1000个用户仅仅需要一个线程来提供服务,256M的栈资源将被释放用于其他用途。

  想获得更多关于comet 的知识,可以阅读comet 项目的与Jetty异步工作章节。

异步RESTful Web Service

  假设一个web应用访问一个远程web服务(例如,SOAP service或RESTful service),通常一个远程服务仅花几百毫秒便可产生一个响应,eBay的RESTful  web服务通常需要350ms来匹配给定关键字的拍卖列表-虽然仅仅只有少数的10ms的CPU时间来处理本地请求和生成一个响应。

  为了每秒处理1000个请求,每一个web服务调用需要200ms,那么一个web应用需要1000*(200+20)/1000 = 220 个线程和110MB内存。如果发生请求风暴时仍会导致线程不足和web服务变慢的情况。如果进行异步处理,那么web应用将不需要持有每一个线程在等待web service响应的时候。及时异步机制需要消耗10ms(通常不消耗),那么web应用将需要1000*(20+10)/1000 = 30 个线程和15MB内存。这将有86%的性能提升,和95MB的内存释放可用于其他地方。而且,如果多重web services请求需要,异步调用将允许并行调用,而不是顺序调用,不需要额外分配线程。

  这有一个Jetty的异步解决方案,查看例子

服务质量(例如,JDBC连接池)

  假设一个web应用每秒处理400个应用请求,每个请求需要与数据库交互50ms。为了处理这些请求,平均每秒需要400*50/1000 = 20 个JDBC连接。然而请求经常爆发或者延迟。为了保护访问风暴下的数据库,通常使用数据库连接池来限制连接。所以对于这个应用程序,它将是合理应用JDBC 30连接池,提供50%的额外保证。

  如果请求在某一时刻翻倍,那么30个连接将不能每秒处理600个请求,那么每秒200个请求将要等待连接池分配连接。如果一个servlet容器有一个200个线程的线程池,那么所有的线程将要等待连接池1秒钟。1秒过后,web应用将不能处理任何请求,因为所有的线程都不可用,即便请求不是用数据库。加倍线程池数量需要额外的100MB内存,而只会给应用程序另外1mc的负载恩典。

  这个线程饥饿状况也会发生如果数据库运行缓慢或暂时不可用。线程饥饿是频发的问题,并导致整个web服务来锁定和变得反应迟钝。如果web容器能够不使用线程来暂停等待一个JDBC连接请求,那么线程饥饿就不会发生,因为只有30个线程被用来访问数据库,而其他470个线程用于处理请求,不访问数据库。

  Jetty的解决方案的一个示例,请参阅Server过滤器的质量。

28.1.3 Servlet线程模型

  Java servlet的可伸缩性能的主要原因是由服务线程模型引起:

每连接一个线程

  传统的Java IO模型,每个TCP/IP 连接与一个线程关联。如果你有一些非常活跃的线程,那么这个模型可以扩展到非常高的每秒请求数。

  然而,许多web应用程序的典型特性是许多持久的HTTP连接等待用户阅读完网页,或者搜索下一个点击的连接。这样的配置,thread-per-connection模型将会有成千的线程需要支持成千的用户的大规模部署问题。

每请求一个线程

  Java NIO库支持异步IO,这样线程就不需要分配到每个连接上,当连接闲置时(两次请求中间),那么连接将会添加到NIO选择集合,它允许一个线程扫描许多活动连接。只有当IO被检测到输入输出的时候线程才会被分配给它。然而servlet 2.5 API模型仍然需要将一个线程分配给一个请求持用的所有时间内。

  这种thread-per-request模型允许更大比例的连接(用户)由于更少的调度延时导致的每秒请求数的减少。

异步请求处理

  Jetty的支持连接API(和 servlet 3.0 异步)在servlet API引入一种改变,就是允许将一个请求多次分配到一个servlet。如果这个servlet没有所需的资源,那么这个servlet将被挂起(或将它放入异步模型),那么这个servlet将会没有任何响应的被返回。当等待的资源可用时,请求将会重新分配到这个servlet,使用一个新的线程,一个响应就会产生。

28.2 使用延续

  异步servlet最初是由Jetty的延续机制引进来的,这是Jetty的一个特殊的机制。Jetty7以后,延续的API已经被扩展成一个通用的API,将在任何servlet-3.0容器上进行异步工作,Jetty6,7,8同样支持。延续机制还可以在servlet 2.5 容器下以阻塞方式运行。

28.2.1 获得一个延续

   ContinuationSupport工厂可以通过request来获得一个延续实例:

Continuation continuation = ContinuationSupport.getContinuation(request);

28.2.2 挂起一个请求

  为了挂起一个请求,挂起方法可以在延续上被调用:

    void doGet(HttpServletRequest request, HttpServletResponse response)
      {
      ...
      // 可选择的:
      // continuation.setTimeout(long);
      continuation.suspend();
      ...
      }

  请求的生命周期将会从Servlet.service(...) 和Filter.doFilter(...) 的调用延续到容器的返回。当这些调用方法返回时,挂起的请求将不会被提交响应也不会被发送到客户端。

  一旦请求被挂起,延续将会注册一个异步服务,这样等待的事件发生时异步服务将会被调用。

  请求将会被挂起,直到continuation.resume()或continuation.complete()方法被调用。如果两个都没有被调用那么延续将会被超时。超时应该设置在挂起之前,通过continuation.setTimeout(long)这个方法,如果没有被设置,那么将使用默认的时间。如果没有超时监听着挂起延续,或者完成这个延续,那么调用continuation.isExpired()返回true。

  挂起类似于servlet 3.0 的 request.startAsync()方法。不像jetty 6的延续,异常不会被抛出并且方法会正常返回。这允许注册的延续在挂起后发生避免发生互斥。如果一个需要一个异常(不知道延续而绕过代码并试图提交响应),那么continuation.undispatch() 方法将会被调用退出当前线程,并抛出一个ContinuationThrowable异常。

28.2.3 恢复一个请求

  一旦异步事件发生,那么延续将被恢复:

    void myAsyncCallback(Object results)
    {
    continuation.setAttribute("results",results);
    continuation.resume();
    }

  当延续被恢复,请求会被重新分配到这个servlet 容器,就像请求重新来过一样。然而在重新分配过程中, continuation.isInitial()方法返回false,所有设置到异步处理器上的参数都是有效的。

  延续的恢复类似于Servlet 3.0 的 AsyncContext.dispatch()方法。

28.2.4 完成一个请求

  当重新恢复一个请求时,异步处理器可能会生成响应,在写入响应后,处理器必须显示的通过调用方法表明延续完成了。

    void myAsyncCallback(Object results)
    {
      writeResults(continuation.getServletResponse(),results);
      continuation.complete();
    }

  当完成方法被调用,容器会安排响应提交并刷新。延续的完成类似于Servlet 3.0 的 AsyncContext.complete()。

28.2.5 延续的监听

  一个应用有可能通过ContinuationListener监听延续的各个状态。

    void doGet(HttpServletRequest request, HttpServletResponse response)
    {
      ...

      Continuation continuation = ContinuationSupport.getContinuation(request);
      continuation.addContinuationListener(new ContinuationListener()
      {
        public void onTimeout(Continuation continuation) { ... }
        public void onComplete(Continuation continuation) { ... }
      });

      continuation.suspend();
      ...
    }

  延续的监听类似于Servlet 3.0 的 AsyncListeners。

28.3 常用延续模式

28.3.1 暂停恢复模式

  暂停/恢复模式用在当一个servlet 或 一个filter用来产生一个响应,在被异步处理器中断并恢复后。一般请求的属性用来传递结果,用来表明请求已经被暂停。

void doGet(HttpServletRequest request, HttpServletResponse response)
{
     // 如果我们需要获得异步的结果
     Object results = request.getAttribute("results");
     if (results==null)
     {
       final Continuation continuation = ContinuationSupport.getContinuation(request);

       // 如果没有超时
       if (continuation.isExpired())
       {
         sendMyTimeoutResponse(response);
         return;
       }

       // 恢复request
       continuation.suspend(); // 注册前一直被暂停

       // 被异步服务注册,此处的代码将被服务调用
       myAsyncHandler.register(new MyHandler()
       {
          public void onMyEvent(Object result)
          {
            continuation.setAttribute("results",results);
            continuation.resume();
          }
       });
       return; // 或者continuation.undispatch();
     }

     // 发送结果
     sendMyResultResponse(response,results);
}

  这是一个非常好的模式即当响应需要servlet容器的工具(如,使用了一个web框架)或者一个事件将恢复多个请求导致容器的线程池被多个处理器使用。

28.3.2 暂停继续模式

  暂停/完成模式是用来当一个异步处理器被用来生成一个响应的情况:

void doGet(HttpServletRequest request, HttpServletResponse response)
{
     final Continuation continuation = ContinuationSupport.getContinuation(request);

     // 如果没有超时
     if (continuation.isExpired())
     {
       sendMyTimeoutResponse(request,response);
       return;
     }

     // 将请求挂起
     continuation.suspend(); // response 可能被包装

     // r注册给异步服务,代码将被服务调用
     myAsyncHandler.register(new MyHandler()
     {
       public void onMyEvent(Object result)
       {
         sendMyResultResponse(continuation.getServletResponse(),results);
         continuation.complete();
       }
     });
}

  这种模式在以下这种情况下是非常好的,即响应不需要容器的工具(如使用一种web框架)并且事件将会恢复一次延续。如果多个响应将被发送(例如,聊天室),那么写一次响应将会阻塞并在其他响应上导致一个DOS。

28.3.3 示例

//
//  ========================================================================
//  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package com.acme;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;


// 简单的异步聊天室
// 这不处理重复的用户名和同一个浏览器的标签情况
// 一些代码是重复的
public class ChatServlet extends HttpServlet
{
    
    // 内部类用来保存每个成员的消息队列
    class Member
    {
        String _name;
        Continuation _continuation;
        Queue<String> _queue = new LinkedList<String>();
    }

    Map<String,Map<String,Member>> _rooms = new HashMap<String,Map<String, Member>>();
    
    
    // 处理浏览器的AJAX调用
    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
    {   
        // Ajax调用的编码形式
        String action = request.getParameter("action");
        String message = request.getParameter("message");
        String username = request.getParameter("user");

        if (action.equals("join"))
            join(request,response,username);
        else if (action.equals("poll"))
            poll(request,response,username);
        else if (action.equals("chat"))
            chat(request,response,username,message);
    }

    private synchronized void join(HttpServletRequest request,HttpServletResponse response,String username)
    throws IOException
    {
        Member member = new Member();
        member._name=username;
        Map<String,Member> room=_rooms.get(request.getPathInfo());
        if (room==null)
        {
            room=new HashMap<String,Member>();
            _rooms.put(request.getPathInfo(),room);
        }
        room.put(username,member); 
        response.setContentType("text/json;charset=utf-8");
        PrintWriter out=response.getWriter();
        out.print("{action:\\"join\\"}");
    }

    private synchronized void poll(HttpServletRequest request,HttpServletResponse response,String username)
    throws IOException
    {
        Map<String,Member> room=_rooms.get(request.getPathInfo());
        if (room==null)
        {
            response.sendError(503);
            return;
        }
        Member member = room.get(username);
        if (member==null)
        {
            response.sendError(503);
            return;
        }

        synchronized(member)
        {
            if (member._queue.size()>0)
            {
                // 发送一个聊天消息
                response.setContentType("text/json;charset=utf-8");
                StringBuilder buf=new StringBuilder();

                buf.append("{\\"action\\":\\"poll\\",");
                buf.append("\\"from\\":\\"");
                buf.append(member._queue.poll());
                buf.append("\\",");

                String message = member._queue.poll();
                int quote=message.indexOf(\'"\');
                while (quote>=0)
                {
                    message=message.substring(0,quote)+\'\\\\\'+message.substring(quote);
                    quote=message.indexOf(\'"\',quote+2);
                }
                buf.append("\\"chat\\":\\"");
                buf.append(message);
                buf.append("\\"}");
                byte[] bytes = buf.toString().getBytes("utf-8");
                response.setContentLength(bytes.length);
                response.getOutputStream().write(bytes);
            }
            else 
            {
                Continuation continuation = ContinuationSupport.getContinuation(request);
                if (continuation.isInitial()) 
                {
                    // 没有消息时,挂起等待聊天或超时
                    continuation.setTimeout(20000);
                    continuation.suspend();
                    member._continuation=continuation;
                }
                else
                {
                    // 超时后发送空的响应
                    response.setContentType("text/json;charset=utf-8");
                    PrintWriter out=response.getWriter();
                    out.print("{action:\\"poll\\"}");
                }
            }
        }
    }

    private synchronized void chat(HttpServletRequest request,HttpServletResponse response,String username,String message)
    throws IOException
    {
        Map<String,Member> room=_rooms.get(request.getPathInfo());
        if (room!=null)
        {
            // 推送消息到所有成员
            for (Member m:room.values())
            {
                synchronized (m)
                {
                    m._queue.add(username); // from
                    m._queue.add(message);  // chat

                    // 如果轮询到则唤醒
                    if (m._continuation!=null)
                    {
                        m._continuation.resume();
                        m._continuation=null;
                    }
                }
            }
        }

        response.setContentType("text/json;charset=utf-8");
        PrintWriter out=response.getWriter();
        out.print("{action:\\"chat\\"}");  
    }
    
    // 提供嵌入css和js的html服务
    // 这应该是静态内容和真正使用的js库
    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
    {
        if (request.getParameter("action")!=null)
            doPost(request,response);
        else
            getServletContext().getNamedDispatcher("default").forward(request,response);
    }
    
}
View Code

  这个ChatServlet 例子,展示了挂起/恢复模式被用来建立一个聊天室(使用了异步servlet)。相同的原则将应用于cometd这样的框架,为这样的应用提供一个基于延续的丰富的环境。

//
//  ========================================================================
//  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.servlets;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

/**
 * Quality of Service Filter.
 * <p>
 * This filter limits the number of active requests to the number set by the "maxRequests" init parameter (default 10).
 * If more requests are received, they are suspended and placed on priority queues.  Priorities are determined by
 * the {@link #getPriority(ServletRequest)} method and are a value between 0 and the value given by the "maxPriority"
 * init parameter (default 10), with higher values having higher priority.
 * <p>
 * This filter is ideal to prevent wasting threads waiting for slow/limited
 * resources such as a JDBC connection pool.  It avoids the situation where all of a
 * containers thread pool may be consumed blocking on such a slow resource.
 * By limiting the number of active threads, a smaller thread pool may be used as
 * the threads are not wasted waiting.  Thus more memory may be available for use by
 * the active threads.
 * <p>
 * Furthermore, this filter uses a priority when resuming waiting requests. So that if
 * a container is under load, and there are many requests waiting for resources,
 * the {@link #getPriority(ServletRequest)} method is used, so that more important
 * requests are serviced first.     For example, this filter could be deployed with a
 * maxRequest limit slightly smaller than the containers thread pool and a high priority
 * allocated to admin users.  Thus regardless of load, admin users would always be
 * able to access the web application.
 * <p>
 * The maxRequest limit is policed by a {@link Semaphore} and the filter will wait a short while attempting to acquire
 * the semaphore. This wait is controlled by the "waitMs" init parameter and allows the expense of a suspend to be
 * avoided if the semaphore is shortly available.  If the semaphore cannot be obtained, the request will be suspended
 * for the default suspend period of the container or the valued set as the "suspendMs" init parameter.
 * <p>
 * If the "managedAttr" init parameter is set to true, then this servlet is set as a {@link ServletContext} attribute with the
 * filter name as the attribute name.  This allows context external mechanism (eg JMX via {@link ContextHandler#MANAGED_ATTRIBUTES}) to
 * manage the configuration of the filter.
 */
@ManagedObject("Quality of Service Filter")
public class QoSFilter implements Filter
{
    private static final Logger LOG = Log.getLogger(QoSFilter.class);

    static final int __DEFAULT_MAX_PRIORITY = 10;
    static final int __DEFAULT_PASSES = 10;
    static final int __DEFAULT_WAIT_MS = 50;
    static final long __DEFAULT_TIMEOUT_MS = -1;

    static final String MANAGED_ATTR_INIT_PARAM = "managedAttr";
    static final String MAX_REQUESTS_INIT_PARAM = "maxRequests";
    static final String MAX_PRIORITY_INIT_PARAM = "maxPriority";
    static final String MAX_WAIT_INIT_PARAM = "waitMs";
    static final String SUSPEND_INIT_PARAM = "suspendMs";

    private final String _suspended = "QoSFilter@" + Integer.toHexString(hashCode()) + ".SUSPENDED";
    private final String _resumed = "QoSFilter@" + Integer.toHexString(hashCode()) + ".RESUMED";
    private long _waitMs;
    private long _suspendMs;
    private int _maxRequests;
    private Semaphore _passes;
    private Queue<AsyncContext>[] _queues;
    private AsyncListener[] _listeners;

    public void init(FilterConfig filterConfig)
    {
        int max_priority = __DEFAULT_MAX_PRIORITY;
        if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM) != null)
            max_priority = Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
        _queues = new Queue[max_priority + 1];
        _listeners = new AsyncListener[_queues.length];
        for (int p = 0; p < _queues.length; ++p)
        {
            _queues[p] = new ConcurrentLinkedQueue<>();
            _listeners[p] = new QoSAsyncListener(p);
        }

        int maxRequests = __DEFAULT_PASSES;
        if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM) != null)
            maxRequests = Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
        _passes = new Semaphore(maxRequests, true);
        _maxRequests = maxRequests;

        long wait = __DEFAULT_WAIT_MS;
        if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM) != null)
            wait = Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
        _waitMs = wait;

        long suspend = __DEFAULT_TIMEOUT_MS;
        if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM) != null)
            suspend = Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
        _suspendMs = suspend;

        ServletContext context = filterConfig.getServletContext();
        if (context != null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
            context.setAttribute(filterConfig.getFilterName(), this);
    }

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
    {
        boolean accepted = false;
        try
        {
            Boolean suspended = (Boolean)request.getAttribute(_suspended);
            if (suspended == null)
            {
                accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
                if (accepted)
                {
                    request.setAttribute(_suspended, Boolean.FALSE);
                    if (LOG.isDebugEnabled())
                        LOG.debug("Accepted {}", request);
                }
                else
                {
                    request.setAttribute(_suspended, Boolean.TRUE);
                    int priority = getPriority(request);
                    AsyncContext asyncContext = request.startAsync();
                    long suspendMs = getSuspendMs();
                    if (suspendMs > 0)
                        asyncContext.setTimeout(suspendMs);
                    asyncContext.addListener(_listeners[priority]);
                    _queues[priority].add(asyncContext);
                    if (LOG.isDebugEnabled())
                        LOG.debug("Suspended {}", request);
                    return;
                }
            }
            else
            {
                if (suspended)
                {
                    request.setAttribute(_suspended, Boolean.FALSE);
                    Boolean resumed = (Boolean)request.getAttribute(_resumed);
                    if (resumed == Boolean.TRUE)
                    {
                        _passes.acquire();
                        accepted = true;
                        if (LOG.isDebugEnabled())
                            LOG.debug("Resumed {}", request);
                    }
                    else
                    {
                        // Timeout! try 1 more time.
                        accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
                        if (LOG.isDebugEnabled())
                            LOG.debug("Timeout {}", request);
                    }
                }
                else
                {
                    // Pass through resume of previously accepted request.
                    _passes.acquire();
                    accepted = true;
                    if (LOG.isDebugEnabled())
                        LOG.debug("Passthrough {}", request);
                }
            }

            if (accepted)
            {
                chain.doFilter(request, response);
            }
            else
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("Rejected {}", request);
                ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
            }
        }
        catch (InterruptedException e)
        {
            ((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
        }
        finally
        {
            if (accepted)
            {
                for (int p = _queues.length - 1; p >= 0; --p)
                {
                    AsyncContext asyncContext = _queues[p].poll();
                    if (asyncContext != null)
                    {
                        ServletRequest candidate = asyncContext.getRequest();
                        Boolean suspended = (Boolean)candidate.getAttribute(_suspended);
                        if (suspended == Boolean.TRUE)
                        {
                            candidate.setAttribute(_resumed, Boolean.TRUE);
                            asyncContext.dispatch();
                            break;
                        }
                    }
                }
                _passes.release();
            }
        }
    }

    /**
     * Computes the request priority.
     * <p>
     * The default implementation assigns the following priorities:
     * <ul>
     * <li> 2 - for an authenticated request
     * <li> 1 - for a request with valid / non new session
     * <li> 0 - for all other requests.
     * </ul>
     * This method may be overridden to provide application specific priorities.
     *
     * @param request the incoming request
     * @return the computed request priority
     */
    protected int getPriority(ServletRequest request)
    {
        HttpServletRequest baseRequest = (HttpServletRequest)request;
        if (baseRequest.getUserPrincipal() != null)
        {
            return 2;
        }
        else
        {
            HttpSession session = baseRequest.getSession(false);
            if (session != null && !session.isNew())
                return 1;
            else
                return 0;
        }
    }

    public void destroy()
    {
    }

    /**
     * Get the (short) amount of time (in milliseconds) that the filter would wait
     * for the semaphore to become available before suspending a request.
     *
     * @return wait time (in milliseconds)
     */
    @ManagedAttribute("(short) amount of time filter will wait before suspending request (in ms)")
    public long getWaitMs()
    {
        return _waitMs;
    }

    /**
     * Set the (short) amount of time (in milliseconds) that the filter would wait
     * for the semaphore to become available before suspending a request.
     *
     *

以上是关于Jetty使用教程(四:28-30)—Jetty开发指南的主要内容,如果未能解决你的问题,请参考以下文章

Jetty使用教程(四:21-22)—Jetty开发指南

Jetty使用教程(四:24-27)—Jetty开发指南

Jetty使用教程——开始使用Jetty

Jetty - 教程

jetty隐藏版本号教程

Jetty简介