Servlet-3 Async Context,如何进行异步写入?
Posted
技术标签:
【中文标题】Servlet-3 Async Context,如何进行异步写入?【英文标题】:Servlet-3 Async Context, how to do asynchronous writes? 【发布时间】:2012-08-18 14:11:46 【问题描述】:问题描述
Servlet-3.0 API 允许分离请求/响应上下文并在稍后回复它。
但是,如果我尝试写入大量数据,例如:
AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()
对于 Tomcat 7 和 Jetty 8,它实际上可能会阻塞 - 而且它确实会阻塞在琐碎的测试用例中。教程建议创建一个线程池,该线程池将 处理这样的设置 - 女巫通常与传统的 10K 架构相反。
但是,如果我有 10,000 个打开的连接和一个线程池,比如说 10 个线程, 对于 1% 的低速连接或刚刚被阻塞的客户端来说,这已经足够了 连接阻塞线程池并完全阻塞彗星响应或 显着减慢速度。
预期的做法是获得“写就绪”通知或 I/O 完成通知 而不是继续推送数据。
如何使用 Servlet-3.0 API 做到这一点,即我如何获得:
关于 I/O 操作的异步完成通知。 通过写就绪通知获取非阻塞 I/O。如果 Servlet-3.0 API 不支持,是否有任何 Web 服务器特定的 API(如 Jetty Continuation 或 Tomcat CometEvent)允许真正异步处理此类事件,而无需使用线程池伪造异步 I/O。
有人知道吗?
如果这不可能,您能否通过参考文档来确认?
示例代码中的问题演示
我附上了下面模拟事件流的代码。
注意事项:
它使用ServletOutputStream
抛出IOException
来检测断开连接的客户端
它发送keep-alive
消息以确保客户端仍然存在
我创建了一个线程池来“模拟”异步操作。
在这样的示例中,我明确定义了大小为 1 的线程池来显示问题:
启动应用程序 从两个终端运行curl http://localhost:8080/path/to/app
(两次)
现在用curd -d m=message http://localhost:8080/path/to/app
发送数据
两个客户端都收到了数据
现在暂停其中一个客户端 (Ctrl+Z) 并再次发送消息curd -d m=message http://localhost:8080/path/to/app
观察到另一个未挂起的客户端要么没有收到任何消息,要么在传输消息后停止接收保持活动的请求,因为其他线程被阻塞。
我想在不使用线程池的情况下解决这样的问题,因为开了1000-5000 连接我可以很快耗尽线程池。
下面的示例代码。
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;
@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet
private long id = 0;
private String message = "";
private final ThreadPoolExecutor pool =
new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
// it is explicitly small for demonstration purpose
private final Thread timer = new Thread(new Runnable()
public void run()
try
while(true)
Thread.sleep(1000);
sendKeepAlive();
catch(InterruptedException e)
// exit
);
class RunJob implements Runnable
volatile long lastUpdate = System.nanoTime();
long id = 0;
AsyncContext ac;
RunJob(AsyncContext ac)
this.ac = ac;
public void keepAlive()
if(System.nanoTime() - lastUpdate > 1000000000L)
pool.submit(this);
String formatMessage(String msg)
StringBuilder sb = new StringBuilder();
sb.append("id");
sb.append(id);
for(int i=0;i<100000;i++)
sb.append("data:");
sb.append(msg);
sb.append("\n");
sb.append("\n");
return sb.toString();
public void run()
String message = null;
synchronized(HugeStreamWithThreads.this)
if(this.id != HugeStreamWithThreads.this.id)
this.id = HugeStreamWithThreads.this.id;
message = HugeStreamWithThreads.this.message;
if(message == null)
message = ":keep-alive\n\n";
else
message = formatMessage(message);
if(!sendMessage(message))
return;
boolean once_again = false;
synchronized(HugeStreamWithThreads.this)
if(this.id != HugeStreamWithThreads.this.id)
once_again = true;
if(once_again)
pool.submit(this);
boolean sendMessage(String message)
try
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(message);
out.flush();
lastUpdate = System.nanoTime();
return true;
catch(IOException e)
ac.complete();
removeContext(this);
return false;
;
private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();
@Override
public void init(ServletConfig config) throws ServletException
super.init(config);
timer.start();
@Override
public void destroy()
for(;;)
try
timer.interrupt();
timer.join();
break;
catch(InterruptedException e)
continue;
pool.shutdown();
super.destroy();
protected synchronized void removeContext(RunJob ac)
asyncContexts.remove(ac);
// GET method is used to establish a stream connection
@Override
protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
// Content-Type header
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
// Access-Control-Allow-Origin header
response.setHeader("Access-Control-Allow-Origin", "*");
final AsyncContext ac = request.startAsync();
ac.setTimeout(0);
RunJob job = new RunJob(ac);
asyncContexts.add(job);
if(id!=0)
pool.submit(job);
private synchronized void sendKeepAlive()
for(RunJob job : asyncContexts)
job.keepAlive();
// POST method is used to communicate with the server
@Override
protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
request.setCharacterEncoding("utf-8");
id++;
message = request.getParameter("m");
for(RunJob job : asyncContexts)
pool.submit(job);
上面的示例使用线程来防止阻塞...但是,如果阻塞客户端的数量大于线程池的大小,它将阻塞。
如何在不阻塞的情况下实现?
【问题讨论】:
我也对这个问题的答案非常感兴趣。一般来说,似乎不可能获得对底层通道的非阻塞访问,但有一些警告,我们可以防止客户端咀嚼线程并对其他客户端造成过于严重的影响。最终,我希望可移植 servlets API 公开一种进行正确非阻塞写入的方法,但我怀疑这会很快到来(他们会说“只写一个 bean/应用程序”而不是使用 servlet 容器)。如果您的 servlet 容器友好,我认为我的解决方案基本上适用于您/我更有限的问题。 你可以看到这个技术,因为我已经在github.com/NWilson/oidrelay 的一个 github 项目中实现了它。欢迎任何cmets!星期六我开始使用 Java(Haskell 和 C 的重度用户),我只用了几个晚上。 【参考方案1】:我发现 Servlet 3.0
Asynchronous
API 很难正确实现,而且有用的文档很少。经过大量试验和错误并尝试了许多不同的方法,我能够找到一个我非常满意的强大解决方案。当我查看我的代码并将其与您的代码进行比较时,我注意到一个可能会帮助您解决特定问题的主要区别。我使用ServletResponse
写入数据,而不是ServletOutputStream
。
我的首选异步 Servlet 类针对您的 some_big_data
案例做了一些调整:
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.apache.log4j.Logger;
@javax.servlet.annotation.WebServlet(urlPatterns = "/async" , asyncSupported = true, initParams = @WebInitParam(name = "threadpoolsize", value = "100") )
public class AsyncServlet extends HttpServlet
private static final Logger logger = Logger.getLogger(AsyncServlet.class);
public static final int CALLBACK_TIMEOUT = 10000; // ms
/** executor service */
private ExecutorService exec;
@Override
public void init(ServletConfig config) throws ServletException
super.init(config);
int size = Integer.parseInt(getInitParameter("threadpoolsize"));
exec = Executors.newFixedThreadPool(size);
@Override
public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
final AsyncContext ctx = req.startAsync();
final HttpSession session = req.getSession();
// set the timeout
ctx.setTimeout(CALLBACK_TIMEOUT);
// attach listener to respond to lifecycle events of this AsyncContext
ctx.addListener(new AsyncListener()
@Override
public void onComplete(AsyncEvent event) throws IOException
logger.info("onComplete called");
@Override
public void onTimeout(AsyncEvent event) throws IOException
logger.info("onTimeout called");
@Override
public void onError(AsyncEvent event) throws IOException
logger.info("onError called: " + event.toString());
@Override
public void onStartAsync(AsyncEvent event) throws IOException
logger.info("onStartAsync called");
);
enqueLongRunningTask(ctx, session);
/**
* if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
* <p/>
* if the @link AsyncContext#getResponse() is null, that means this context has already timed out (and context listener has been invoked).
*/
private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session)
exec.execute(new Runnable()
@Override
public void run()
String some_big_data = getSomeBigData();
try
ServletResponse response = ctx.getResponse();
if (response != null)
response.getWriter().write(some_big_data);
ctx.complete();
else
throw new IllegalStateException(); // this is caught below
catch (IllegalStateException ex)
logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
catch (Exception e)
logger.error("ERROR IN AsyncServlet", e);
);
/** destroy the executor */
@Override
public void destroy()
exec.shutdown();
【讨论】:
几个问题:(1) ctx.complete() 关闭响应,我实际上想重用它以稍后发送更多数据。 (2) 使用 response.getWriter() 不好,因为它不会抛出 IOException,所以我不知道例如客户端是否已断开连接;见***.com/questions/12039939/…; (3) 看来您实际上是在使用线程池来“解决”问题,而不是使响应真正异步。 @Artyom 从客户端的角度来看,响应确实是异步的。客户端发出请求,服务器将在未来的某个时间做出响应。使用 HTTP 1.1 标准,连接保持活动状态并重复用于多个请求,从而允许这种异步行为。调用 ctx.complete() 会关闭响应,但您需要做的就是让客户端在第一次收到数据后直接再次请求数据,如果您甚至需要它。另请参阅:***.com/questions/7124508/… 您所描述的是长轮询,但我说的是 HTTP 流,它是允许的。这就是服务器发送事件的构建方式:w3.org/TR/eventsource,这就是我想要构建的。基本上我不会关闭连接,而是继续在新事件时流式传输数据。即我说的是服务器端异步响应处理(不是客户端) @Artyom 同意。我实现的是使用 Servlet 3.0 API 中指定的异步特性进行长轮询。我怀疑你可以用它来实现你想要的。也许您需要的是来自 Tomcat 7.0.27 及更高版本的新 WebSocket 实现:tomcat.apache.org/tomcat-7.0-doc/web-socket-howto.html 为努力提供赏金。不是因为它实际上是答案。所以我不接受。【参考方案2】:在我研究这个话题的过程中,这个帖子一直在弹出,所以想在这里提一下:
Servlet 3.1 在 ServletInputStream
和 ServletOutputStream
上引入了异步操作。见ServletOutputStream.setWriteListener
。
一个例子可以在http://docs.oracle.com/javaee/7/tutorial/servlets013.htm找到
【讨论】:
【参考方案3】:这可能会有所帮助
http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/async-servlet/async-servlets.html
【讨论】:
【参考方案4】:我们不能完全使写入异步。实际上,我们必须忍受这样的限制,即当我们向客户写一些东西时,我们希望能够及时这样做,并且如果我们不这样做,就能够将其视为错误。也就是说,如果我们的目标是尽可能快地将数据流式传输到客户端,并使用通道的阻塞/非阻塞状态作为控制流量的一种方式,那么我们就不走运了。但是,如果我们以客户端应该能够处理的低速率发送数据,我们至少能够及时断开读取速度不够快的客户端。
例如,在您的应用程序中,我们以较慢的速度(每隔几秒)发送 keepalive,并希望客户端能够跟上他们正在发送的所有事件。我们将数据挥霍给客户端,如果跟不上,我们可以及时干净地断开连接。这比真正的异步 I/O 更受限制,但它应该可以满足您的需求(顺便说一下,我的)。
诀窍在于,所有仅抛出 IOExceptions 的写入输出的方法实际上做的远不止这些:在实现中,所有对可能被中断()的事物的调用都将用类似这样的东西包装(取自 Jetty 9):
catch (InterruptedException x)
throw (IOException)new InterruptedIOException().initCause(x);
(我还注意到,这不会发生在 Jetty 8 中,其中记录了 InterruptedException 并立即重试阻塞循环。大概您确保您的 servlet 容器行为良好使用这个技巧。)
也就是说,当慢速客户端导致写入线程阻塞时,我们只需通过在线程上调用 interrupt() 来强制将写入作为 IOException 抛出。想一想:非阻塞代码无论如何都会在我们的一个处理线程上消耗一个时间单位来执行,因此使用刚刚中止的阻塞写入(比如一毫秒之后)在原则上实际上是相同的。我们仍然只是在线程上花费了很短的时间,只是效率稍微低了一点。
我已经修改了您的代码,以便主计时器线程在我们开始写入之前运行一个作业来限制每次写入的时间,如果写入快速完成,该作业将被取消,这是应该的。
最后一点:在实现良好的 servlet 容器中,导致 I/O 抛出 应该 是安全的。如果我们能捕捉到 InterruptedIOException 并稍后再次尝试写入,那就太好了。如果慢速客户端跟不上完整的流,我们可能想给他们事件的一个子集。据我所知,在 Jetty 中这并不完全安全。如果写入抛出,则 HttpResponse 对象的内部状态可能不够一致,无法处理以后安全地重新进入写入。我认为尝试以这种方式推送 servlet 容器是不明智的,除非我错过了提供此保证的特定文档。我认为这个想法是,如果发生 IOException,连接被设计为关闭。
这里是代码,修改后的 RunJob::run() 使用了一个非常简单的说明(实际上,我们希望在这里使用主计时器线程,而不是每次写入都启动一个,这很愚蠢) .
public void run()
String message = null;
synchronized(HugeStreamWithThreads.this)
if(this.id != HugeStreamWithThreads.this.id)
this.id = HugeStreamWithThreads.this.id;
message = HugeStreamWithThreads.this.message;
if(message == null)
message = ":keep-alive\n\n";
else
message = formatMessage(message);
final Thread curr = Thread.currentThread();
Thread canceller = new Thread(new Runnable()
public void run()
try
Thread.sleep(2000);
curr.interrupt();
catch(InterruptedException e)
// exit
);
canceller.start();
try
if(!sendMessage(message))
return;
finally
canceller.interrupt();
while (true)
try canceller.join(); break;
catch (InterruptedException e)
boolean once_again = false;
synchronized(HugeStreamWithThreads.this)
if(this.id != HugeStreamWithThreads.this.id)
once_again = true;
if(once_again)
pool.submit(this);
【讨论】:
致downvoter:谢谢,任何cmets为什么?我在这个答案上很努力,我自己想知道是否有更好的解决方案。如果您认为有更好的方法,请分享。【参考方案5】:Spring 适合您吗? Spring-MVC 3.2 有一个名为DeferredResult
的类,它将优雅地处理您的“10,000 个打开连接/10 个服务器池线程”场景。
示例:http://blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support/
【讨论】:
(我以前看过这个链接,和其他很多人一样)它是如何异步工作的? 客户端:通过 WebSockets 或长轮询;服务器端:异步DeferredResult
将在非服务器池线程中处理后返回。查看 spring-mvc-chat 示例git link;它真的很简洁,应该能够在很短的时间内告诉你它是否是你需要的。【参考方案6】:
我快速浏览了您的列表,所以我可能遗漏了一些要点。 池线程的优点是随着时间的推移在多个任务之间共享线程资源。也许您可以通过间隔不同http连接的keepAlive响应来解决您的问题,而不是同时将所有这些响应分组。
【讨论】:
以上是关于Servlet-3 Async Context,如何进行异步写入?的主要内容,如果未能解决你的问题,请参考以下文章
Flask-mail send_async_email() 生成异常和 RunTimeError: Working outside of application context