重学Springboot系列之服务器推送技术

Posted 大忽悠爱忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了重学Springboot系列之服务器推送技术相关的知识,希望对你有一定的参考价值。


主流服务器推送技术说明

需求与背景

若干年前,所有的请求都是由浏览器端发起,浏览器本身并没有接受请求的能力。所以一些特殊需求都是用ajax轮询的方式来实现的。比如:

  • 股价展示页面实时的获取股价更新
  • 赛事的文字直播,实时更新赛况
  • 通过页面启动一个任务,前端想知道任务后台的实时运行状态

通常的做法就是需要以较小的间隔,频繁的向服务器建立http连接询问任务状态的更新,然后刷新页面显示状态。但这样做的后果就是浪费大量流量,对服务端造成了非常大的压力。


服务端推送常用技术

html5被广泛推广之后,我们可以使用服务端主动推送数据,浏览器接收数据的方式来解决上面提到的问题。下面我们就为大家介绍两种服务端数据推送技术

全双工通信:WebSocket

全双工的,全双工就是双向通信。如果说http协议是“对讲机”之间的通话(你一句我一句,有来有回),那我们的websocket就是移动电话(可以随时发送信息与接收信息,就是全双工)。


本质上是一个额外的tcp连接,建立和关闭时握手使用http协议,其他数据传输不使用http协议 ,更加复杂一些,比较适用于需要进行复杂双向实时数据通讯的场景。在web网页上面的客服、聊天室一般都是使用WebSocket 协议来开发的。


服务端主动推送:SSE (Server Send Event)

html5新标准,用来从服务端实时推送数据到浏览器端, 直接建立在当前http连接上,本质上是保持一个http长连接,轻量协议 。客户端发送一个请求到服务端 ,服务端保持这个请求连接直到一个新的消息准备好,将消息返回至客户端。除非主动关闭,这个连接会一直保持。

  • 建立连接
  • 服务端 -> 浏览器(连接保持)
  • 关闭连接

SSE的一大特色就是重复利用1个连接来接收服务端发送的消息(又称event),从而避免不断轮询请求建立连接,造成服务资源紧张。


websocket与SSE比较


但是IE和Edge浏览器不支持SSE,所以SSE目前的应用场景比较少。 虽然websocket在很多比较旧的版本浏览器上面也不兼容,但是总体上比SSE要好不少。另外还有一些开源的JS前端产品,如 SockJSSocket.IO,在浏览器端提供了更好的websocket前端js编程体验,与浏览器有更好的兼容性。


服务端推送事件SSE

模拟网络支付场景

大家应该都用过支付系统,比如淘宝买一个产品之后进行扫码支付。我们来看看如果结合SSE,该如何实现这个过程。

  • 用户扫码向支付系统(支付宝)进行支付
  • 支付完成之后,告知商户系统(淘宝卖家系统)我已经发起支付了(建立SSE连接)
  • 支付系统(支付宝)告诉商户系统(淘宝卖家系统),这个用户确实支付成功了
  • 商户系统(淘宝卖家系统)向用户发送消息:你已经支付成功,跳转到支付成功页面。(通过SSE连接,由服务器端告知用户客户端浏览器)

注意:在返回最终支付结果的操作,实现了服务端向客户端的事件推送,可以使用SSE来实现


应用场景

从 sse 的特点出发,我们可以大致的判断出它的应用场景,需要轮询获取服务端最新数据的 case 下,多半是可以用它的

比如显示当前网站在线的实时人数,法币汇率显示当前实时汇率,电商大促的实时成交额等等…


sse 规范

在 html5 的定义中,服务端 sse,一般需要遵循以下要求

请求头

开启长连接 + 流方式传递

Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

数据格式

服务端发送的消息,由 message 组成,其格式如下:

field:value\\n\\n

其中 field 有五种可能

: 即以:开头,表示注释,可以理解为服务端向客户端发送的心跳,确保连接不中断
data:数据
event: 事件,默认值
id: 数据标识符用 id 字段表示,相当于每一条数据的编号
retry: 重连时间

模拟实现

如果下面的代码理解不了的时候,回头看看这张图

我们写代码来模拟上面时序图中的2、3、4四个步骤的实现。

浏览器前端实现

对于服务器端向浏览器发送的数据,浏览器端需要在 javascript 中使用 EventSource 对象来进行处理。EventSource 使用的是标准的事件监听器方式,只需要在对象上添加相应的事件处理方法即可。EventSource 提供了三个标准事件


除了使用标准的事件处理方法,还可以使用addEventListener 方法对事件进行监听。

var es = new EventSource('事件源名称') ;  //与事件源建立连接
//标准事件处理方法,还有onopen、onerror
es.onmessage = function(e) 
;
//可以监听自定义的事件名称
es.addEventListener('自定义事件名称', function(e) 
);

ssetest.html(商户系统的用户支付页面)

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE</title>
</head>
<body>
<div id = "message">

</div>
<script>
    if (window.EventSource)  //判断浏览器是否支持SSE
        //第2步,主动进行建立长连接,表明用户已经发起支付
        let source = new EventSource(
            'http://localhost/dhy/orderpay?payid=1');
        let innerHTML = '';

        //监听服务器端发来的事件:open
        source.onopen = function(e) 
            console.log("连接建立")
            innerHTML += "onopen:准备就绪,可以开始接收服务器数据" + "<br/>"; //支付结果
            document.getElementById("message").innerHTML = innerHTML;
        ;
        //监听服务器端发来的事件:message
        source.onmessage = function(e) 
            console.log("服务器发送的消息为: "+e)
            innerHTML += "onmessage:" + e.data + "<br/>"; //支付结果
            document.getElementById("message").innerHTML = innerHTML;
        ;
        //自定义finish事件,主动关闭EventSource
        source.addEventListener('finish', function(e) 
            console.log("服务器发送的事件: "+e)
            source.close();
            innerHTML += "支付结果接收完毕,通知服务端关闭EventSource" +  "<br/>";
            document.getElementById("message").innerHTML = innerHTML;
        , false);
        //监听服务器端发来的事件:error
        source.onerror = function(e) 
            console.log("服务器出现的异常: "+e)
            if (e.readyState === EventSource.CLOSED) 
                innerHTML += "sse连接已关闭" +  "<br/>";
             else 
                console.log(e);
            
        ;
     else 
        console.log("你的浏览器不支持SSE");
    
</script>

</body>
</html>

服务端实现

Controller代码(商户系统服务端代码)

@RestController
public class SSEControler 
    //建立之后根据订单id,将SseEmitter存到ConcurrentHashMap
    //正常应该存到数据库里面,生成数据库订单,这里我们只是模拟一下
    public static final ConcurrentHashMap<Long, SseEmitter> sseEmitters
            = new ConcurrentHashMap<>();

    //第2步:接受用户建立长连接,表示该用户已支付,已支付就可以生成订单(未确认状态)
    @GetMapping("/orderpay")
    public SseEmitter orderpay(@RequestParam Long payid) throws IOException 
        System.out.println("=======orderpay方法执行========");
        //设置默认的超时时间3秒,超时之后服务端主动关闭连接。
        //超时时间指的是服务器不发送数据给客户端的时间间隔
        SseEmitter emitter = new SseEmitter(3 * 1000L);
        sseEmitters.put(payid,emitter);
        emitter.onTimeout(() -> sseEmitters.remove(payid));
        emitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
        //执行完毕后的回调接口触发
        emitter.onCompletion(() -> System.out.println("完成!!!"));
        return emitter;
    

    //第3步:接受支付系统的支付结果告知,表明用户支付成功
    @GetMapping("/payback")
    public void payback (@RequestParam Long payid)
        System.out.println("=======payback方法执行========");
        //把SSE连接取出来
        SseEmitter emitter = sseEmitters.get(payid);
        try 
            //第4步:由服务端告知浏览器端:该用户支付成功了
            emitter.send("用户支付成功"); //触发前端message事件。
            //触发前端自定义的finish事件
            emitter.send(SseEmitter.event().name("finish").id("6666").data("哈哈"));
         catch (IOException e) 
            emitter.completeWithError(e);   //出发前端onerror事件
        
    


SseEmitter api介绍

  • send(): 发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到 data 中
  • complete(): 表示执行完毕,会断开连接
  • onTimeout(): 超时回调触发
  • onCompletion(): 结束之后的回调触发

访问测试

模拟测试第2步

用户访问 http://localhost:8888/ssetest.html 页面。将自动执行ssetest.html页面的js代码,

let source = new EventSource( 'https://localhost:8888/orderpay?payid=1');

从而模拟用户在浏览器发起支付之后告知“商户系统”:用户已经发起支付。


模拟测试第3步

用PostMan模拟支付系统(支付宝),向商户系统接口 https://localhost:8888/payback?payid=1 发送请求,模拟“支付系统”向我们自己开发的商户系统请求,告知:该用户支付成功。

模拟测试第4步

商户系统告知用户所在的浏览器,你支付成功了(服务器数据推送)。自动在浏览器上将“支付成功”的信息打印出来。


因为是第一次接收服务器端的数据推送,所以打印了图中的第一行文字onopen

因为是接收了服务端的send message,所以打印了图中的第2行文字onmessage

服务端在数据send之后触发了自定义的finish事件,所以打印了图中的第3行文字


对连接超时异常进行全局处理

@ExceptionHandler(AsyncRequestTimeoutException.class)
@ResponseBody
public String handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) 
    return SseEmitter.event().data("timeout!!").build().stream()
            .map(d -> d.getData().toString())
            .collect(Collectors.joining());



SSE技术推荐参考文章

【SringBoot WEB 系列】SSE 服务器发送事件详解

【SpringBoot WEB 系列】SSE 服务器发送事件详解

SSE技术详解:一种全新的HTML5服务器推送事件技术


双向实时通信websocket

整合websocket

<!-- 引入websocket依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

开启websocket功能

@Configuration
public class WebSocketConfig 
    @Bean
    public ServerEndpointExporter serverEndpointExporter() 
        return new ServerEndpointExporter();
    


兼容HTTPS协议

  • WebSocket的ws协议是基于HTTP协议实现的
  • WebSocket的wss协议是基于HTTPS协议实现的

一旦你的项目里面使用了https协议,你的websocket就要使用wss协议才可以。怎么让Spring Boot项目支持WSS协议?

参考我之前的文章 为Web容器配置HTTPS,在那一节TomcatCustomizer 配置的基础之上加上如下的代码,就可以支持wss协议。

@Bean
public TomcatContextCustomizer tomcatContextCustomizer() 

    return new TomcatContextCustomizer() 
        @Override
        public void customize(Context context) 
            context.addServletContainerInitializer(new WsSci(), null);
        

    ;


WebSocket编程基础

连接的建立

前端js向后端发送wss连接建立请求

如果使用http协议,改为ws即可

socket = new WebSocket("wss://localhost:8888/ws/asset");

SpringBoot服务端WebSocket服务接收类定义如下:

@Component
@Slf4j
@ServerEndpoint(value = "/ws/asset")
public class WebSocketServer   

全双工数据交互

前端后端都有

  • onopen事件监听,处理连接建立事件
  • onmessage事件监听,处理对方发过来的消息数据
  • onclose事件监听,处理连接关闭
  • onerror事件监听,处理交互过程中的异常


数据发送

浏览器与服务器交换数据


前端JS

socket.send(message);

后端Java,向某一个javax.websocket.Session用户发送消息。

/** 
 * 发送消息,实践表明,每次浏览器刷新,session会发生变化。 
 * @param session  session
 * @param message  消息
 */  
private static void sendMessage(Session session, String message) throws IOException
    session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));
  

一个用户向其他用户群发


服务器向所有在线的javax.websocket.Session用户发送消息。


/** 
 * 群发消息 
 * @param message  消息
 */  
public static void broadCastInfo(String message) throws IOException 
    for (Session session : SessionSet)   
        if(session.isOpen())  
            sendMessage(session, message);  
          
      


websocket实现聊天软件

WebSocketServer本节内容的核心代码,websocket服务端代码

  • @ServerEndpoint(value = “/ws/asset”)表示websocket的接口服务地址
  • @OnOpen注解的方法,为连接建立成功时调用的方法
  • @OnClose注解的方法,为连接关闭调用的方法
  • @OnMessage注解的方法,为收到客户端消息后调用的方法
  • @OnError注解的方法,为出现异常时调用的方法
@Component
@Slf4j
@ServerEndpoint(value = "/ws/asset")
public class WebSocketServer   

    //用来统计连接客户端的数量
    private static final AtomicInteger OnlineCount = new AtomicInteger(0);
    // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。  
    private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<>();
    
    /** 
     * 连接建立成功调用的方法 
     */  
    @OnOpen
    public void onOpen(Session session) throws IOException 
        SessionSet.add(session);   
        int cnt = OnlineCount.incrementAndGet(); // 在线数加1  
        log.info("有连接加入,当前连接数为:", cnt);
    

    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) throws IOException 
        log.info("来自客户端的消息:",message);
        sendMessage(session, "Echo消息内容:"+message);
        // broadCastInfo(message); 群发消息
    


    /** 
     * 连接关闭调用的方法 
     */  
    @OnClose
    public void onClose(Session session)   
        SessionSet.remove(session);  
        int cnt = OnlineCount.decrementAndGet();  
        log.info("有连接关闭,当前连接数为:", cnt);  
      

    /** 
     * 出现错误
     */  
    @OnError
    public void onError(Session session, Throwable error)   
        log.error("发生错误:,Session ID: ",error.getMessage(),session.getId());
      
  
    /** 
     * 发送消息,实践表明,每次浏览器刷新,session会发生变化。 
     * @param session  session
     * @param message  消息
     */  
    private static void sendMessage(Session session, String message) throws IOException 

        session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));

      
  
    /** 
     * 群发消息 
     * @param message  消息
     */  
    public static void broadCastInfo(String message) throws IOException 
        for (Session session : SessionSet)   
            if(session.isOpen())  
                sendMessage(session, message重学SpringBoot系列之Mockito测试

重学Springboot系列之整合数据库开发框架---上

重学SpringBoot系列之统一全局异常处理

重学Springboot系列之邮件发送的整合与使用

重学SpringBoot系列之整合分布式文件系统

重学SpringBoot系列之redis与spring cache缓存