SSE 和 Servlet 3.0

Posted

技术标签:

【中文标题】SSE 和 Servlet 3.0【英文标题】:SSE and Servlet 3.0 【发布时间】:2012-06-08 08:17:27 【问题描述】:

我在页面加载时注册了一个典型的 SSE:

客户:

sseTest: function()

var source = new EventSource('mySSE');
source.onopen = function(event)
console.log("eventsource opened!");
;

source.onmessage = function(event)
var data = event.data;
console.log(data);
document.getElementById('sse').innerhtml+=event.data + "<br />";
;

我的 javascript 调试器说,“事件源已打开!”成功了。

我的服务器代码是 Servlet 3.0:

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Random;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet(urlPatterns="/mySSE", name = "hello-sse", asyncSupported=true)
public class MyServletSSE extends HttpServlet 

@Override
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException 

resp.setContentType("text/event-stream");
resp.setCharacterEncoding("UTF-8");

Random random = new Random();
PrintWriter out = resp.getWriter();

//AsyncContext aCtx = req.startAsync(req, resp);
//ServletRequest sReq = aCtx.getRequest();

String next = "data: " + String.valueOf(random.nextInt(100) + 1) + "\n\n";
//out.print("retry: 600000\n"); //set the timeout to 10 mins in milliseconds
out.write(next);
out.flush();
// do not close the stream as EventSource is listening
//out.close();
//super.doGet(req, resp);


代码有效! Client-Code 每 3 秒触发一次 doGet()-Method 并检索新数据。

问题: 但是,我想知道如何通过使用新的 Servlet 3.0 期货(例如 Async-Support 或 asyncContext.addListener(asyncListener) 或其他我不知道的东西)来使这段代码变得更好。由于我从不关闭流,我想知道我的服务器将如何扩展

理论上,最好的方法是在有新数据时通过服务器端代码显式触发doGet()方法,因此客户端不需要触发客户端“onmessage()”方法因此服务器端“doGet()”-每隔 3 秒获取一次新数据。

【问题讨论】:

这是我在 SO 中见过的最好的问题之一,虽然我回答了这个问题,但我实际上从中学到了很多东西,尤其是关于 EventSource! 如果有 1000 个客户端,是否意味着将有 1000 个连接到服务器? 【参考方案1】:

这是一个很好的问题,这是一个完整的工作示例(Servlet 3.0 / Java EE 6)

一些注意事项:

    它通过out.checkError() 处理“浏览器选项卡/窗口关闭”,也调用flush() 我写的很快,所以我相信它可以改进,只是一个POC,在测试之前不要在生产中使用

Servlet:(为简洁起见省略了导入,将很快更新完整的要点)

@WebServlet(urlPatterns = "/mySSE", asyncSupported = true)
public class MyServletSSE extends HttpServlet 

  private final Queue<AsyncContext> ongoingRequests = new ConcurrentLinkedQueue<>();
  private ScheduledExecutorService service;

  @Override
  public void init(ServletConfig config) throws ServletException 
    final Runnable notifier = new Runnable() 
      @Override
      public void run() 
        final Iterator<AsyncContext> iterator = ongoingRequests.iterator();
        //not using for : in to allow removing items while iterating
        while (iterator.hasNext()) 
          AsyncContext ac = iterator.next();
          Random random = new Random();
          final ServletResponse res = ac.getResponse();
          PrintWriter out;
          try 
            out = res.getWriter();
            String next = "data: " + String.valueOf(random.nextInt(100) + 1) + "num of clients = " + ongoingRequests.size() + "\n\n";
            out.write(next);
            if (out.checkError())  //checkError calls flush, and flush() does not throw IOException
              iterator.remove();
            
           catch (IOException ignored) 
            iterator.remove();
          
        
      
    ;
    service = Executors.newScheduledThreadPool(10);
    service.scheduleAtFixedRate(notifier, 1, 1, TimeUnit.SECONDS);
  

  @Override
  public void doGet(HttpServletRequest req, HttpServletResponse res) 
    res.setContentType("text/event-stream");
    res.setCharacterEncoding("UTF-8");

    final AsyncContext ac = req.startAsync();
    ac.setTimeout(60 * 1000);
    ac.addListener(new AsyncListener() 
      @Override public void onComplete(AsyncEvent event) throws IOException ongoingRequests.remove(ac);
      @Override public void onTimeout(AsyncEvent event) throws IOException ongoingRequests.remove(ac);
      @Override public void onError(AsyncEvent event) throws IOException ongoingRequests.remove(ac);
      @Override public void onStartAsync(AsyncEvent event) throws IOException 
    );
    ongoingRequests.add(ac);
  

JSP:

<%@page contentType="text/html" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html>
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
        <title>JSP Page</title>
        <script>
            function test() 
                var source = new EventSource('mySSE');
                source.onopen = function(event) 
                    console.log("eventsource opened!");
                ;

                source.onmessage = function(event) 
                    var data = event.data;
                    console.log(data);
                    document.getElementById('sse').innerHTML += event.data + "<br />";
                ;
            
            window.addEventListener("load", test);
        </script>
    </head>
    <body>
        <h1>Hello SSE!</h1>
        <div id="sse"></div>
    </body>
</html>

【讨论】:

一个缓慢的客户端读取将减慢所有其他客户端的写入速度,因为此技术使用阻塞写入。 (这是 2013 年的合适答案)。现代应用程序可能应该使用带有异步 I/O 写入的 Servlet 3.1。 能否给出“Servlet 3.1 with Async I/O writes”的链接? Joakim:这真的有问题吗?我刚刚进行了试运行。两个客户端,一个在 alert() 中阻塞。服务器每 30 秒向两者写入一条短消息。一夜之间跑的很顺利。也许链中有很多缓冲,但仍然......【参考方案2】:

有用的例子。

人们可能会因为 startAsync() 得到“IllegalStateException: Not supported”,在这种情况下不要忘记:

@WebServlet(urlPatterns = "/Sse", asyncSupported=true)

或使用

request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);

来自this 发帖

【讨论】:

AND 也适用于所有适用的过滤器。

以上是关于SSE 和 Servlet 3.0的主要内容,如果未能解决你的问题,请参考以下文章

XML配置Servlet Servlet是3.0版本以下的版本

Java EE - Servlet 3.0 和 Spring MVC

Servlet 3.0 新特性详解

Servlet 3.0 新特性详解 (转载)

Servlet 3.0 新特性详解

Servlet 2.0 && Servlet 3.0 新特性