SSE:后端向前端发送消息(springboot SseEmitter)

Posted F元凯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SSE:后端向前端发送消息(springboot SseEmitter)相关的知识,希望对你有一定的参考价值。

背景

有一个项目,前端vue,后端springboot。现在需要做一个功能:用户在使用系统的时候,管理员发布公告,则使用系统的用户可以看到该公告。
基于此,一个简单的方案:前端使用JS方法setInterval,重复调用后端公告获取接口。此方法有几点缺陷:

  • 循环调用的时间间隔不好确定:太长了,获取公告的时效有延迟;太短了,给服务器造成压力,很多请求都是无用的(公告发布的时间不定,很可能几天都没有新公告);
  • token的续期问题:项目中,前端请求,需要带上token,token有过期时间,如果用户一直使用(前后端有交互),会无感续期。如果有这种定时循环和后端交互的场景,就会造成token用不过期(循环的调用会触发续期),当然,可以在续期中,排除某个场景的请求,但是这样的设计不好,因为这种场景太多了,就会造成维护上的困难。

因此就想到了,如果后端主动向前端推送消息,这个问题就可以完美解决。

方案

有两种方案可以实现后端向前端推送消息:

  1. 使用websocket;
  2. 使用sse;

这里介绍SSE的方式(如果系统中对这种消息的准确性和可靠性有严格的要求,则使用websocket,websocket的使用相对复杂的多);
如果想了解SSE的详细基础知识,可以参考阮一峰老师的这篇文章:Server-Sent Events 教程

SSE后端代码

SpringMVC中,已经集成了该功能,所以无需额外引入jar包,直接上代码:

@RestController
@RequestMapping("/notice")
public class NoticeController 

    @Autowired
    private NoticeService noticeService;

    @GetMapping(path = "createSseEmitter")
    public SseEmitter createSseEmitter(String id) 
        return noticeService.createSseEmitter(id);
    

    @PostMapping(path = "sendMsg")
    public boolean sendMsg(String id, String content) 
        noticeService.sendMsg(id, content);
        return true;
    



@Slf4j
@Service
public class NoticeServiceImpl implements NoticeService 
    @Autowired
    @Qualifier("sseEmitterCacheService")
    private CacheService<SseEmitter> sseEmitterCacheService;

    @Override
    public SseEmitter createSseEmitter(String clientId) 
        if (StringUtil.isBlank(clientId)) 
            clientId = UUID.randomUUID().toString().replace("-", "");
        
        SseEmitter sseEmitter = sseEmitterCacheService.getCache(clientId);
        log.info("获取SSE,id=", clientId);
        final String id = clientId;
        sseEmitter.onCompletion(() -> 
            log.info("SSE已完成,关闭连接 id=", id);
            sseEmitterCacheService.deleteCache(id);
        );
        return sseEmitter;
    
    @Override
    public void sendMsg(String clientId, String content) 
        if (sseEmitterCacheService.hasCache(clientId)) 
            SseEmitter sseEmitter = sseEmitterCacheService.getCache(clientId);
            try 
                sseEmitter.send(content);
             catch (IOException e) 
                log.error("发送消息失败:", e.getMessage(), e);
                throw new BusinessRuntimeExcepption(CustomExcetionConstant.IO_ERR, "发送消息失败", e);
            
         else 
            log.error("SSE对象不存在");
            throw new BusinessRuntimeExcepption("SSE对象不存在");
        
    

这里,只列出了核心的代码,简而言之,需要做到两点即可:

  1. 前端首先是发起一个请求,创建SseEmitter,即createSseEmitter方法,该方法必须返回一个SseEmitter对象;
  2. 返回的SseEmitter,后端必须要缓存起来(我用的是ehcache,也可以直接定义一个map来缓存);

为什么要这么做?看下文,后端代码一起来分析就明白了。

前端代码

由于,我请求该接口,需要带上token,所以直接使用EventSource不行,另外这个IE也不支持。所以选择了一个工具:event-source-polyfill。

  1. 先安装event-source-polyfill
npm install event-source-polyfill
  1. 然后使用:
import  EventSourcePolyfill  from "event-source-polyfill";
  created() 
    let _this = this;
    this.source = new EventSourcePolyfill(
      "/" +
        process.env.VUE_APP_MANAGER_PRE_API_URL +
        "/notice/createSseEmitter?id=" +
        uuid(),
      
        headers: 
          [process.env.VUE_APP_OAUTH_AUTHORIZATION]: store.getters.getToken,
        ,
        //重连时间间隔,单位:毫秒,默认45000毫秒,这里设置为10分钟
        heartbeatTimeout: 10 * 60 * 1000,
      
    );

    this.source.onopen = () => 
      console.log("NOTICE建立连接");
    ;
    this.source.onmessage = (e) => 
      _this.scrollMessage = e.data;
      console.log("NOTICE接收到消息");
    ;
    this.source.onerror = (e) => 
      if (e.readyState == EventSource.CLOSED) 
        console.log("NOTICE连接关闭");
       else if (this.source.readyState == EventSource.CONNECTING) 
        console.log("NOTICE正在重连");
        //重新设置header
        this.source.headers = 
          [process.env.VUE_APP_OAUTH_AUTHORIZATION]: store.getters.getToken,
        ;
       else 
        console.log(e);
      
    ;
  ,

有几点说明:

  • new EventSourcePolyfill中,可以带入header
  • heartbeatTimeout是一个心跳时间,默认情况下间隔heartbeatTimeout后,会触发重新连接后端接口;
  • this.source.headers,该行的作用是在重连的时候重新设置header,如果不这样,那么重连的时候,用的参数信息,还是和最开始的一样(包括本例中url中的id)。而由于我的项目中,如果token其他操作触发了刷新token,则有效token可能会变,所以,这里取缓存中放置的token,而不应该使用最初的token。
    好了,这样就基本实现了我们所需要的功能了。

特别注意

前端配置了代理,所以一直收不到后端发送的消息,尝试加入以下参数:

  devServer: 
    compress:false,
    …………

问题

之前在写后端的时候提到了两个问题:为什么要返回SseEmitter对象?为什么要缓存SseEmitter对象?
其实看过SSE的原理,都应该明白:这就是一个长连接,前端调用创建SseEmitter对象的接口,虽然接口返回了,但是并未结束(这就是为什么要返回SseEmitter对象,如果返回的是一个其他对象,就和普通的接口没两样了, 该接口就直接结束了),请看下截图:

发起请求之后,一直是待处理,并未结束,10分钟之后,该请求被取消(前端设置的重连),然后重新发起连接,重新发起的连接也是在等待中。只有接收到消息后,这个请求的状态码才是200,但是这个时候才连接已经建立好了。其中的细节,这里不做讲述。
所以,如果再使用SseEmitter对象发送消息,则前端就可以收到对象的消息了(即实现后端向前端发送消息)。这里使用的SseEmitter对象,就是createSseEmitter接口返回的对象(也就是使用哪个SseEmitter对象,就可以向哪个前端发送消息)。这也就是为什么要缓存SseEmitter对象的原因了。

效果

通过调用发送消息接口,前端即可立即展示发送的消息:

后端向前端推送消息

SpringBoot+WebSocket集成

    什么是WebSocket?
    为什么需要 WebSocket?
    前言
    maven依赖
    WebSocketConfig
    WebSocketServer
    消息推送
    页面发起
    运行效果
    后续
    Websocker注入Bean问题
    netty-websocket-spring-boot-starter
    Springboot2+Netty+Websocket
    ServerEndpointExporter错误
    正式项目的前端WebSocket框架 GoEasy
    `@Component`和`@ServerEndpoint`关于是否单例模式,能否使用static Map等一些问题的解答
    Vue版本的websocket连接

什么是WebSocket?

这里写图片描述
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
为什么需要 WebSocket?

初次接触 WebSocket 的人,都会问同样的问题:我们已经有了 HTTP 协议,为什么还需要另一个协议?它能带来什么好处?

答案很简单,因为 HTTP 协议有一个缺陷:通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。
这里写图片描述
举例来说,我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。因此WebSocket 就是这样发明的。

前言

2020-10-20 教程补充:

补充关于@Component和@ServerEndpoint关于是否单例模式等的解答,感谢大家热心提问和研究。
Vue版本的websocket连接方法

2020-01-05 教程补充:

整合了IM相关的优化
优化开启/关闭连接的处理
上传到开源项目spring-cloud-study-websocket,方便大家下载代码。

感谢大家的支持和留言,14W访问量是满满的动力!接下来还会有websocket+redis集群优化篇针对多ws服务器做简单优化处理,敬请期待!

话不多说,马上进入干货时刻。
maven依赖

SpringBoot2.0对WebSocket的支持简直太棒了,直接就有包可以引入

    <dependency>  
       <groupId>org.springframework.boot</groupId>  
       <artifactId>spring-boot-starter-websocket</artifactId>  
   </dependency> 

1
2
3
4

WebSocketConfig

启用WebSocket的支持也是很简单,几句代码搞定

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**

  • 开启WebSocket支持
  • @author zhengkai.blog.csdn.net
    */

@Configuration
public class WebSocketConfig {


@Bean  
public ServerEndpointExporter serverEndpointExporter() {  
    return new ServerEndpointExporter();  
}  

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

WebSocketServer

这就是重点了,核心都在这里。

因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller

直接@ServerEndpoint("/imserver/{userId}") 、@Component启用即可,然后在里面实现@OnOpen开启连接,@onClose关闭连接,@onMessage接收消息等方法。

新建一个ConcurrentHashMap webSocketMap 用于接收当前userId的WebSocket,方便IM之间对userId进行推送消息。单机版实现到这里就可以。

集群版(多个ws节点)还需要借助mysql或者redis等进行处理,改造对应的sendMessage方法即可。

package com.softdev.system.demo.config;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;

/**

  • @author zhengkai.blog.csdn.net
    */

@ServerEndpoint("/imserver/{userId}")
@Component
public class WebSocketServer {

static Log log=LogFactory.get(WebSocketServer.class);
/**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
private static int onlineCount = 0;
/**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
private Session session;
/**接收userId*/
private String userId="";

/**
 * 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
    this.session = session;
    this.userId=userId;
    if(webSocketMap.containsKey(userId)){
        webSocketMap.remove(userId);
        webSocketMap.put(userId,this);
        //加入set中
    }else{
        webSocketMap.put(userId,this);
        //加入set中
        addOnlineCount();
        //在线数加1
    }

    log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());

    try {
        sendMessage("连接成功");
    } catch (IOException e) {
        log.error("用户:"+userId+",网络异常!!!!!!");
    }
}

/**
 * 连接关闭调用的方法
 */
@OnClose
public void onClose() {
    if(webSocketMap.containsKey(userId)){
        webSocketMap.remove(userId);
        //从set中删除
        subOnlineCount();
    }
    log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
}

/**
 * 收到客户端消息后调用的方法
 *
 * @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
    log.info("用户消息:"+userId+",报文:"+message);
    //可以群发消息
    //消息保存到数据库、redis
    if(StringUtils.isNotBlank(message)){
        try {
            //解析发送的报文
            JSONObject jsonObject = JSON.parseObject(message);
            //追加发送人(防止串改)
            jsonObject.put("fromUserId",this.userId);
            String toUserId=jsonObject.getString("toUserId");
            //传送给对应toUserId用户的websocket
            if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
            }else{
                log.error("请求的userId:"+toUserId+"不在该服务器上");
                //否则不在这个服务器上,发送到mysql或者redis
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

/**
 *
 * @param session
 * @param error
 */
@OnError
public void onError(Session session, Throwable error) {
    log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
    error.printStackTrace();
}
/**
 * 实现服务器主动推送
 */
public void sendMessage(String message) throws IOException {
    this.session.getBasicRemote().sendText(message);
}


/**
 * 发送自定义消息
 * */
public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
    log.info("发送消息到:"+userId+",报文:"+message);
    if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
        webSocketMap.get(userId).sendMessage(message);
    }else{
        log.error("用户"+userId+",不在线!");
    }
}

public static synchronized int getOnlineCount() {
    return onlineCount;
}

public static synchronized void addOnlineCount() {
    WebSocketServer.onlineCount++;
}

public static synchronized void subOnlineCount() {
    WebSocketServer.onlineCount--;
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146

消息推送

至于推送新信息,可以再自己的Controller写个方法调用WebSocketServer.sendInfo();即可

import com.softdev.system.demo.config.WebSocketServer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;
import java.io.IOException;

/**

  • WebSocketController
  • @author zhengkai.blog.csdn.net
    */

@RestController
public class DemoController {

@GetMapping("index")
public ResponseEntity<String> index(){
    return ResponseEntity.ok("请求成功");
}

@GetMapping("page")
public ModelAndView page(){
    return new ModelAndView("websocket");
}

@RequestMapping("/push/{toUserId}")
public ResponseEntity<String> pushToWeb(String message, @PathVariable String toUserId) throws IOException {
    WebSocketServer.sendInfo(message,toUserId);
    return ResponseEntity.ok("MSG SEND SUCCESS");
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

页面发起

页面用js代码调用websocket,当然,太古老的浏览器是不行的,一般新的浏览器或者谷歌浏览器是没问题的。还有一点,记得协议是ws的,如果使用了一些路径类,可以replace(“http”,“ws”)来替换协议。

<!DOCTYPE html>
<html>
<head>

<meta charset="utf-8">
<title>websocket通讯</title>

</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script>

var socket;
function openSocket() {
    if(typeof(WebSocket) == "undefined") {
        console.log("您的浏览器不支持WebSocket");
    }else{
        console.log("您的浏览器支持WebSocket");
        //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
        //等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
        //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
        var socketUrl="http://localhost:9999/demo/imserver/"+$("#userId").val();
        socketUrl=socketUrl.replace("https","ws").replace("http","ws");
        console.log(socketUrl);
        if(socket!=null){
            socket.close();
            socket=null;
        }
        socket = new WebSocket(socketUrl);
        //打开事件
        socket.onopen = function() {
            console.log("websocket已打开");
            //socket.send("这是来自客户端的消息" + location.href + new Date());
        };
        //获得消息事件
        socket.onmessage = function(msg) {
            console.log(msg.data);
            //发现消息进入    开始处理前端触发逻辑
        };
        //关闭事件
        socket.onclose = function() {
            console.log("websocket已关闭");
        };
        //发生了错误事件
        socket.onerror = function() {
            console.log("websocket发生了错误");
        }
    }
}
function sendMessage() {
    if(typeof(WebSocket) == "undefined") {
        console.log("您的浏览器不支持WebSocket");
    }else {
        console.log("您的浏览器支持WebSocket");
        console.log(\'{"toUserId":"\'+$("#toUserId").val()+\'","contentText":"\'+$("#contentText").val()+\'"}\');
        socket.send(\'{"toUserId":"\'+$("#toUserId").val()+\'","contentText":"\'+$("#contentText").val()+\'"}\');
    }
}

</script>
<body>
<p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
<p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
<p>【toUserId】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
<p>【操作】:<div>开启socket</div>
<p>【操作】:<div>发送消息</div>
</body>

</html>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

运行效果

v20200105,加入开源项目spring-cloud-study-websocket,更新运行效果,更方便理解。
v1.1的效果,刚刚修复了日志,并且支持指定监听某个端口,代码已经全部更新,现在是这样的效果

打开两个页面,按F12调出控控制台查看测试效果:

页面 参数
http://localhost:9999/demo/page fromUserId=10,toUserId=20
http://localhost:9999/demo/page fromUserId=20,toUserId=10

分别开启socket,再发送消息
在这里插入图片描述
在这里插入图片描述

  1. 向前端推送数据:

    http://localhost:9999/demo/push/10?message=123123

在这里插入图片描述
通过调用push api,可以向指定的userId推送信息,当然报文这里乱写,建议规定好格式。
后续

针对简单IM的业务场景,进行了一些优化,可以看后续的文章SpringBoot2+WebSocket之聊天应用实战(优化版本)(v20201005已整合)

主要变动是CopyOnWriteArraySet改为ConcurrentHashMap,保证多线程安全同时方便利用map.get(userId)进行推送到指定端口。

相比之前的Set,Set遍历是费事且麻烦的事情,而Map的get是简单便捷的,当WebSocket数量大的时候,这个小小的消耗就会聚少成多,影响体验,所以需要优化。在IM的场景下,指定userId进行推送消息更加方便。
Websocker注入Bean问题

关于这个问题,可以看最新发表的这篇文章,在参考和研究了网上一些攻略后,项目已经通过该方法注入成功,大家可以参考。
关于controller调用controller/service调用service/util调用service/websocket中autowired的解决方法
netty-websocket-spring-boot-starter

Springboot2构建基于Netty的高性能Websocket服务器(netty-websocket-spring-boot-starter)
只需要换个starter即可实现高性能websocket,赶紧使用吧
Springboot2+Netty+Websocket

Springboot2+Netty实现Websocket,使用官方的netty-all的包,比原生的websocket更加稳定更加高性能,同等配置情况下可以handle更多的连接。

代码样式全部已经更正,也支持websocket连接url带参数功能,另外也感谢大家的阅读和评论,一起进步,谢谢!~~
ServerEndpointExporter错误

org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘serverEndpointExporter’ defined in class path resource [com/xxx/WebSocketConfig.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available

感谢@来了老弟儿 的反馈:

如果tomcat部署一直报这个错,请移除 WebSocketConfig 中@Bean ServerEndpointExporter 的注入 。

ServerEndpointExporter 是由Spring官方提供的标准实现,用于扫描ServerEndpointConfig配置类和@ServerEndpoint注解实例。使用规则也很简单:

如果使用默认的嵌入式容器 比如Tomcat 则必须手工在上下文提供ServerEndpointExporter。
如果使用外部容器部署war包,则不需要提供提供ServerEndpointExporter,因为此时SpringBoot默认将扫描服务端的行为交给外部容器处理,所以线上部署的时候要把WebSocketConfig中这段注入bean的代码注掉。

正式项目的前端WebSocket框架 GoEasy

感谢kkatrina的补充,正式的项目中,一般是用第三方websocket框架来做,稳定性、实时性有保证的多,也会包括一些心跳、重连机制。

GoEasy专注于服务器与浏览器,浏览器与浏览器之间消息推送,完美兼容世界上的绝大多数浏览器,包括IE6, IE7之类的非常古老的浏览器。支持Uniapp,各种小程序,react,vue等所有主流Web前端技术。
GoEasy采用 发布/订阅 的消息模式,帮助您非常轻松的实现一对一,一对多的通信。
https://www.goeasy.io/cn/doc/

@Component和@ServerEndpoint关于是否单例模式,能否使用static Map等一些问题的解答

看到大家都在热心的讨论关于是否单例模式这个问题,请大家相信自己的直接,如果websocket是单例模式,还怎么服务这么多session呢。

websocket是原型模式,@ServerEndpoint每次建立双向通信的时候都会创建一个实例,区别于spring的单例模式。
Spring的@Component默认是单例模式,请注意,默认 而已,是可以被改变的。
这里的@Component仅仅为了支持@Autowired依赖注入使用,如果不加则不能注入任何东西,为了方便。
什么是prototype 原型模式? 基本就是你需要从A的实例得到一份与A内容相同,但是又互不干扰的实例B的话,就需要使用原型模式。
关于在原型模式下使用static 的webSocketMap,请注意这是ConcurrentHashMap ,也就是线程安全/线程同步的,而且已经是静态变量作为全局调用,这种情况下是ok的,或者大家如果有顾虑或者更好的想法的化,可以进行改进。 例如使用一个中间类来接收和存放session。
为什么每次都@OnOpen都要检查webSocketMap.containsKey(userId) ,首先了为了代码强壮性考虑,假设代码以及机制没有问题,那么肯定这个逻辑是废的对吧。但是实际使用的时候发现偶尔会出现重连失败或者其他原因导致之前的session还存在,这里就做了一个清除旧session,迎接新session的功能。

Vue版本的websocket连接

感谢@GzrStudy的贡献,供大家参考。

<script>
export default {

data() {
    return {
        socket:null,
        userId:localStorage.getItem("ms_uuid"),
        toUserId:\'2\',
        content:\'3\'
    }
},

methods: {

openSocket() {
  if (typeof WebSocket == "undefined") {
    console.log("您的浏览器不支持WebSocket");
  } else {
    console.log("您的浏览器支持WebSocket");
    //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
    //等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
    //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
    var socketUrl =
      "http://localhost:8081/imserver/" + this.userId;
    socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
    console.log(socketUrl);
    if (this.socket != null) {
      this.socket.close();
      this.socket = null;
    }
    this.socket = new WebSocket(socketUrl);
    //打开事件
    this.socket = new WebSocket(socketUrl);
    //打开事件
    this.socket.onopen = function() {
      console.log("websocket已打开");
      //socket.send("这是来自客户端的消息" + location.href + new Date());
    };
    //获得消息事件
    this.socket.onmessage = function(msg) {
      console.log(msg.data);
      //发现消息进入    开始处理前端触发逻辑
    };
    //关闭事件
    this.socket.onclose = function() {
      console.log("websocket已关闭");
    };
    //发生了错误事件
    this.socket.onerror = function() {
      console.log("websocket发生了错误");
    };
  }
},
sendMessage() {
  if (typeof WebSocket == "undefined") {
    console.log("您的浏览器不支持WebSocket");
  } else {
    console.log("您的浏览器支持WebSocket");
    console.log(
      \'{"toUserId":"\' +
         this.toUserId +
        \'","contentText":"\' +
         this.content +
        \'"}\'
    );
    this.socket.send(
      \'{"toUserId":"\' +
         this.toUserId +
        \'","contentText":"\' +
         this.content +
        \'"}\'
     );

}

}
————————————————
版权声明:本文为CSDN博主「Moshow郑锴」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/moshowg...

以上是关于SSE:后端向前端发送消息(springboot SseEmitter)的主要内容,如果未能解决你的问题,请参考以下文章

后端向前端页面发送变量的方法,可以包含特殊字符(如英文双引号)

SpringBoot及SpringCloud实现webSocket群发及单点发送

SpringBoot+Vue+Websocket 实现服务器端向客户端主动发送消息

Springboot集成SSE实现消息推送之单工通信

将错误消息从 SSE (Webflux) Spring Boot 应用程序传递到 Angular 7 前端

Ajax——后端向前端传数据