基于 SSE 实现服务端消息主动推送解决方案

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于 SSE 实现服务端消息主动推送解决方案相关的知识,希望对你有一定的参考价值。

一、SSE 服务端消息推送

SSEServer-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source 接口被制定为html5 的一部分。不过现在IE不支持该技术,只能通过轮训的方式实现。相比于 WebSocketSSE 简单很多,服务器端和客户端工作量都要小很多、简单很多,同时实现的功能也要有局限。

相比于 WebSocket 两者的区别:

  • WebSocket 是全双工通道,可以双向通信,功能更强。SSE是单向通道,只能服务器向浏览器端发送。

  • WebSocket 是一个新的协议,需要服务器端支持。SSE则是部署在 HTTP协议之上的,现有的服务器软件都支持。

  • SSE是一个轻量级协议,相对简单。WebSocket是一种较重的协议,相对复杂。

  • SSE默认支持断线重连,WebSocket则需要额外部署。

  • SSE支持自定义发送的数据类型。

  • SSE不支持CORS,参数 url 就是服务器网址,必须与当前网页的网址在同一个网域(domain),而且协议和端口都必须相同。

在我们平常使用 SpringBoot 进行开发中,其实已经集成好了 SSE ,里面有个 SseEmitter 类已经封装好了相关操作,可以方便的实现功能。

但是在实现的时候需要注意下是否要兼容 IE 浏览器的使用,IE 浏览器目前不支持 Event Source 接口,如果需要兼容 IE 可以创建一个顺序队列比如放在 Redis 中或 消息队列中,IE 客户端通过接口轮训的方式每次都从队列中消费消息,以实现 SSE 的功能。

下面分别从 服务端 和 客户端进行实施。

二、服务端

服务端需要考虑 IE 浏览器不支持的情况,对于 IE 可以通过轮训的方式实现,首先新建一个 SpringBoot 项目,声明 SseEmitter 连接:

@Slf4j
public class SseEmitterServer 

    private static AtomicInteger count = new AtomicInteger(0);

    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    public static SseEmitter connect(String userId) 
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onCompletion(() -> 
            log.info("结束连接:", userId);
            removeUser(userId);
        );
        sseEmitter.onError(throwable -> 
            log.info("连接异常:", userId);
            removeUser(userId);
        );
        sseEmitter.onTimeout(() -> 
            log.info("连接超时:", userId);
            removeUser(userId);
        );
        sseEmitterMap.put(userId, sseEmitter);
        count.getAndIncrement();
        log.info("创建新的sse连接,当前用户:", userId);
        return sseEmitter;
    


    public static void sendMessage(String userId, Object message) 
        if (sseEmitterMap.containsKey(userId)) 
            try 
                sseEmitterMap.get(userId).send(message);
                log.info("SSE 发送信息成功!id =  , message:  ", userId, message);
             catch (IOException e) 
                log.error("[]推送异常:", userId, e.getMessage());
                removeUser(userId);
            
         else 
            log.warn("SSE 发送信息异常,用户不存在:id =  ", userId);
        
    
    
    private static void removeUser(String userId) 
        sseEmitterMap.remove(userId);
        count.getAndDecrement();
    

然后声明 SSEWebServer 释放对客户端的接口,对于 IE 的话我们就直接提供一个接口,每次都从队列中消费消息,这里以 LinkedBlockingDeque 为例实现一个单机的队列,如果是分布式的可能就要考虑 Redis 或 消息队列 :

@Slf4j
@CrossOrigin
@RestController
@RequestMapping("/sse")
public class SSEWebServer 

    private static Cache<String, LinkedBlockingDeque<SseEvent>> sseCache = CacheBuilder.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(60, TimeUnit.MINUTES)
            .build();

    /* *
     * sse 连接服务
     */
    @GetMapping("/sseEvent/userId")
    public SseEmitter push(@PathVariable("userId") String userId) 
        return SseEmitterServer.connect(userId);
    

    //IE 浏览器不支持SSE 采用轮训
    @GetMapping("/sseEventIE/userId")
    public ResponseEntity pushIe(@PathVariable("userId") String userId) 
        if (StringUtils.isEmpty(userId)) 
            return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(" userId is Empty ! ");
        
        log.info("IE 连接,userId =  ", userId);
        try 
            SseEvent poll = Objects.requireNonNull(sseCache.getIfPresent(userId)).poll();
            return poll == null ? ResponseEntity.status(HttpStatus.BAD_REQUEST).body("连接失败!") : ResponseEntity.ok().body(poll.getMsg());
         catch (Exception e) 
            return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(e.getMessage());
        
    

    static boolean publicMsg(SseEvent event) 
        LinkedBlockingDeque<SseEvent> ifPresent = sseCache.getIfPresent(event.getUserId());
        if (ifPresent == null) 
            sseCache.put(event.getUserId(), new LinkedBlockingDeque<SseEvent>());
        
        log.info("添加到队列,userId: ", event.getUserId());
        return Objects.requireNonNull(sseCache.getIfPresent(event.getUserId())).offer(event);
    

上面为考虑 IE 的兼容性,多增加了一个接口和队列,因此在发布的时候,就需要同时向 SSE 和 队列 抛数据,因此这块可以在增加一个事件发布:

事件发布我们就使用 Spring 自带的 ApplicationListener 来实现。

首先创建一个事件交易类:

@Getter
@Setter
@ToString
public class SseEvent<T> extends ApplicationEvent 
    private int code;
    private String userId;
    private T msg;

    public SseEvent(Object source) 
        super(source);
    

声明事件监听,在这里同时向 SSE 和 队列发送消息:

@Slf4j
@Component
public class SseListener implements ApplicationListener<SseEvent> 
    @Override
    public void onApplicationEvent(SseEvent event) 
        SseEmitterServer.sendMessage(event.getUserId(), event.getMsg());
        SSEWebServer.publicMsg(event);
    

最后再 创建一个测试接口,便于我们下面的测试:

@RestController
public class TestController 

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;


    @GetMapping("/test/userId/message")
    public ResponseEntity test(@PathVariable("userId") String userId, @PathVariable("message") String message) 
        SseEvent<String> sseEvent = new SseEvent<>(this);
        sseEvent.setCode(200);
        sseEvent.setMsg(message);
        sseEvent.setUserId(userId);
        applicationEventPublisher.publishEvent(sseEvent);
        return ResponseEntity.ok().build();
    


到此服务端就已经完成了,下面开始客户端的对接:

三、客户端

相比于服务端,客户端就显得非常简单了,但也要考虑 IE 不支持的情况,需要进行判断,如果是 IE 的话,就换成轮训的方式:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>

</body>
<script src="js/jquery-1.10.2.min.js"></script>
<script>

    openSSE("1122", function (msg) 
        console.log("收到服务器推送消息:" + msg);
    );

    function openSSE(userId, callback) 
        if (window.EventSource) 
            var source = new EventSource('http://localhost:8080/sse/sseEvent/' + userId);
            source.onmessage = function (event) 
                callback(event.data);
            ;
         else 
            //ie 不支持sse 采用轮训
            window.setInterval(function () 
                $.ajax(
                    url: "http://localhost:8080/sse/sseEventIE/" + userId + "?" + new Date().getTime(),
                    method: "GET",
                    success: function (result) 
                        callback(result);
                    
                )
            , 1000);
        
    

</script>
</html>

四、效果测试

启动服务端,首先演示 SSE 的效果,使用 goole 浏览器打开客户端网页,可以看到服务端日志的打印:


可以看到客户端已经连接,下面使用测试接口对 1122 用户发送消息,使用浏览器访问下面地址: http://localhost:8080/test/1122/测试 SSE 发送消息!

查看服务端日志打印:

可以看到同时向SSE 和 队列抛出了消息,下面看客户端浏览器打印的日志:


已经收到了服务端推送的消息。

下面开始对 IE 浏览器进行测试,用 IE 浏览器打开页面:

开始了每秒一次的轮训,由于服务端没有消息,一直返回的 400 状态,下面使用上面的接口发送一次消息: http://localhost:8080/test/1122/测试 IE 发送消息!

查看IE浏览器打印的日志:

已经接收到了服务端推送的消息!

以上是关于基于 SSE 实现服务端消息主动推送解决方案的主要内容,如果未能解决你的问题,请参考以下文章

通过SSE(Server-Send Event)实现服务器主动向浏览器端推送消息

PHP+SSE服务器向客户端推送消息

PHP+SSE服务器向客户端推送消息

其他技术---- 服务端推送SSE

重学Springboot系列之服务器推送技术

后端消息推送-SSE协议