重学Springboot系列之服务器推送技术
Posted 大忽悠爱忽悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了重学Springboot系列之服务器推送技术相关的知识,希望对你有一定的参考价值。
重学Springboot系列之服务器推送技术
主流服务器推送技术说明
需求与背景
若干年前,所有的请求都是由浏览器端发起,浏览器本身并没有接受请求的能力。所以一些特殊需求都是用ajax轮询的方式来实现的。比如:
- 股价展示页面实时的获取股价更新
- 赛事的文字直播,实时更新赛况
- 通过页面启动一个任务,前端想知道任务后台的实时运行状态
通常的做法就是需要以较小的间隔,频繁的向服务器建立http连接询问任务状态的更新,然后刷新页面显示状态。但这样做的后果就是浪费大量流量,对服务端造成了非常大的压力。
服务端推送常用技术
在html5被广泛推广之后,我们可以使用服务端主动推送数据,浏览器接收数据
的方式来解决上面提到的问题。下面我们就为大家介绍两种服务端数据推送技术
全双工通信:WebSocket
全双工的,全双工就是双向通信。如果说http协议是“对讲机”之间的通话(你一句我一句,有来有回),那我们的websocket就是移动电话(可以随时发送信息与接收信息,就是全双工)。
本质上是一个额外的tcp连接,建立和关闭时握手使用http协议,其他数据传输不使用http协议 ,更加复杂一些,比较适用于需要进行复杂双向实时数据通讯的场景。在web网页上面的客服、聊天室一般都是使用WebSocket 协议来开发的。
服务端主动推送:SSE (Server Send Event)
html5新标准,用来从服务端实时推送数据到浏览器端, 直接建立在当前http连接上,本质上是保持一个http长连接,轻量协议 。客户端发送一个请求到服务端 ,服务端保持这个请求连接直到一个新的消息准备好,将消息返回至客户端。除非主动关闭,这个连接会一直保持。
- 建立连接
- 服务端 -> 浏览器(连接保持)
- 关闭连接
SSE的一大特色就是重复利用1个连接来接收服务端发送的消息(又称event)
,从而避免不断轮询请求建立连接,造成服务资源紧张。
websocket与SSE比较
但是IE和Edge浏览器不支持SSE,所以SSE目前的应用场景比较少。
虽然websocket在很多比较旧的版本浏览器上面也不兼容,但是总体上比SSE要好不少。另外还有一些开源的JS前端产品,如 SockJS
, Socket.IO
,在浏览器端提供了更好的websocket
前端js编程体验,与浏览器有更好的兼容性。
服务端推送事件SSE
模拟网络支付场景
大家应该都用过支付系统,比如淘宝买一个产品之后进行扫码支付。我们来看看如果结合SSE,该如何实现这个过程。
- 用户扫码向支付系统(支付宝)进行支付
- 支付完成之后,告知商户系统(淘宝卖家系统)我已经发起支付了(建立SSE连接)
- 支付系统(支付宝)告诉商户系统(淘宝卖家系统),这个用户确实支付成功了
- 商户系统(淘宝卖家系统)向用户发送消息:你已经支付成功,跳转到支付成功页面。
(通过SSE连接,由服务器端告知用户客户端浏览器)
注意:在返回最终支付结果的操作,实现了服务端向客户端的事件推送,可以使用SSE来实现
应用场景
从 sse 的特点出发,我们可以大致的判断出它的应用场景,需要轮询获取服务端最新数据的 case 下,多半是可以用它的
比如显示当前网站在线的实时人数,法币汇率显示当前实时汇率,电商大促的实时成交额等等…
sse 规范
在 html5 的定义中,服务端 sse,一般需要遵循以下要求
请求头
开启长连接 + 流方式传递
Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive
数据格式
服务端发送的消息,由 message 组成,其格式如下:
field:value\\n\\n
其中 field 有五种可能
空: 即以:开头,表示注释,可以理解为服务端向客户端发送的心跳,确保连接不中断
data:数据
event: 事件,默认值
id: 数据标识符用 id 字段表示,相当于每一条数据的编号
retry: 重连时间
模拟实现
如果下面的代码理解不了的时候,回头看看这张图
我们写代码来模拟上面时序图中的2、3、4四个步骤的实现。
浏览器前端实现
对于服务器端向浏览器发送的数据,浏览器端需要在 javascript 中使用 EventSource 对象来进行处理。EventSource 使用的是标准的事件监听器方式,只需要在对象上添加相应的事件处理方法即可。EventSource 提供了三个标准事件
除了使用标准的事件处理方法,还可以使用addEventListener 方法对事件进行监听。
var es = new EventSource('事件源名称') ; //与事件源建立连接
//标准事件处理方法,还有onopen、onerror
es.onmessage = function(e)
;
//可以监听自定义的事件名称
es.addEventListener('自定义事件名称', function(e)
);
ssetest.html(商户系统的用户支付页面)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE</title>
</head>
<body>
<div id = "message">
</div>
<script>
if (window.EventSource) //判断浏览器是否支持SSE
//第2步,主动进行建立长连接,表明用户已经发起支付
let source = new EventSource(
'http://localhost/dhy/orderpay?payid=1');
let innerHTML = '';
//监听服务器端发来的事件:open
source.onopen = function(e)
console.log("连接建立")
innerHTML += "onopen:准备就绪,可以开始接收服务器数据" + "<br/>"; //支付结果
document.getElementById("message").innerHTML = innerHTML;
;
//监听服务器端发来的事件:message
source.onmessage = function(e)
console.log("服务器发送的消息为: "+e)
innerHTML += "onmessage:" + e.data + "<br/>"; //支付结果
document.getElementById("message").innerHTML = innerHTML;
;
//自定义finish事件,主动关闭EventSource
source.addEventListener('finish', function(e)
console.log("服务器发送的事件: "+e)
source.close();
innerHTML += "支付结果接收完毕,通知服务端关闭EventSource" + "<br/>";
document.getElementById("message").innerHTML = innerHTML;
, false);
//监听服务器端发来的事件:error
source.onerror = function(e)
console.log("服务器出现的异常: "+e)
if (e.readyState === EventSource.CLOSED)
innerHTML += "sse连接已关闭" + "<br/>";
else
console.log(e);
;
else
console.log("你的浏览器不支持SSE");
</script>
</body>
</html>
服务端实现
Controller代码(商户系统服务端代码)
@RestController
public class SSEControler
//建立之后根据订单id,将SseEmitter存到ConcurrentHashMap
//正常应该存到数据库里面,生成数据库订单,这里我们只是模拟一下
public static final ConcurrentHashMap<Long, SseEmitter> sseEmitters
= new ConcurrentHashMap<>();
//第2步:接受用户建立长连接,表示该用户已支付,已支付就可以生成订单(未确认状态)
@GetMapping("/orderpay")
public SseEmitter orderpay(@RequestParam Long payid) throws IOException
System.out.println("=======orderpay方法执行========");
//设置默认的超时时间3秒,超时之后服务端主动关闭连接。
//超时时间指的是服务器不发送数据给客户端的时间间隔
SseEmitter emitter = new SseEmitter(3 * 1000L);
sseEmitters.put(payid,emitter);
emitter.onTimeout(() -> sseEmitters.remove(payid));
emitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
//执行完毕后的回调接口触发
emitter.onCompletion(() -> System.out.println("完成!!!"));
return emitter;
//第3步:接受支付系统的支付结果告知,表明用户支付成功
@GetMapping("/payback")
public void payback (@RequestParam Long payid)
System.out.println("=======payback方法执行========");
//把SSE连接取出来
SseEmitter emitter = sseEmitters.get(payid);
try
//第4步:由服务端告知浏览器端:该用户支付成功了
emitter.send("用户支付成功"); //触发前端message事件。
//触发前端自定义的finish事件
emitter.send(SseEmitter.event().name("finish").id("6666").data("哈哈"));
catch (IOException e)
emitter.completeWithError(e); //出发前端onerror事件
SseEmitter api介绍
- send(): 发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到 data 中
- complete(): 表示执行完毕,会断开连接
- onTimeout(): 超时回调触发
- onCompletion(): 结束之后的回调触发
访问测试
模拟测试第2步
用户访问 http://localhost:8888/ssetest.html 页面。将自动执行ssetest.html页面的js代码,
let source = new EventSource( 'https://localhost:8888/orderpay?payid=1');
从而模拟用户在浏览器发起支付之后告知“商户系统”:用户已经发起支付。
模拟测试第3步
用PostMan模拟支付系统(支付宝),向商户系统接口 https://localhost:8888/payback?payid=1 发送请求,模拟“支付系统”向我们自己开发的商户系统请求,告知:该用户支付成功。
模拟测试第4步
商户系统告知用户所在的浏览器,你支付成功了(服务器数据推送)。自动在浏览器上将“支付成功”的信息打印出来。
因为是第一次接收服务器端的数据推送,所以打印了图中的第一行文字onopen
因为是接收了服务端的send message,所以打印了图中的第2行文字onmessage
服务端在数据send之后触发了自定义的finish事件,所以打印了图中的第3行文字
对连接超时异常进行全局处理
@ExceptionHandler(AsyncRequestTimeoutException.class)
@ResponseBody
public String handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e)
return SseEmitter.event().data("timeout!!").build().stream()
.map(d -> d.getData().toString())
.collect(Collectors.joining());
SSE技术推荐参考文章
【SringBoot WEB 系列】SSE 服务器发送事件详解
【SpringBoot WEB 系列】SSE 服务器发送事件详解
双向实时通信websocket
整合websocket
<!-- 引入websocket依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
开启websocket功能
@Configuration
public class WebSocketConfig
@Bean
public ServerEndpointExporter serverEndpointExporter()
return new ServerEndpointExporter();
兼容HTTPS协议
- WebSocket的ws协议是基于HTTP协议实现的
- WebSocket的wss协议是基于HTTPS协议实现的
一旦你的项目里面使用了https协议,你的websocket就要使用wss协议才可以。怎么让Spring Boot项目支持WSS协议?
参考我之前的文章 为Web容器配置HTTPS,在那一节TomcatCustomizer 配置的基础之上加上如下的代码,就可以支持wss协议。
@Bean
public TomcatContextCustomizer tomcatContextCustomizer()
return new TomcatContextCustomizer()
@Override
public void customize(Context context)
context.addServletContainerInitializer(new WsSci(), null);
;
WebSocket编程基础
连接的建立
前端js向后端发送wss连接建立请求
如果使用http协议,改为ws即可
socket = new WebSocket("wss://localhost:8888/ws/asset");
SpringBoot服务端WebSocket服务接收类定义如下:
@Component
@Slf4j
@ServerEndpoint(value = "/ws/asset")
public class WebSocketServer
全双工数据交互
前端后端都有
- onopen事件监听,处理连接建立事件
- onmessage事件监听,处理对方发过来的消息数据
- onclose事件监听,处理连接关闭
- onerror事件监听,处理交互过程中的异常
数据发送
浏览器与服务器交换数据
前端JS
socket.send(message);
后端Java,向某一个javax.websocket.Session用户发送消息。
/**
* 发送消息,实践表明,每次浏览器刷新,session会发生变化。
* @param session session
* @param message 消息
*/
private static void sendMessage(Session session, String message) throws IOException
session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));
一个用户向其他用户群发
服务器向所有在线的javax.websocket.Session用户发送消息。
/**
* 群发消息
* @param message 消息
*/
public static void broadCastInfo(String message) throws IOException
for (Session session : SessionSet)
if(session.isOpen())
sendMessage(session, message);
websocket实现聊天软件
WebSocketServer本节内容的核心代码,websocket服务端代码
- @ServerEndpoint(value = “/ws/asset”)表示websocket的接口服务地址
- @OnOpen注解的方法,为连接建立成功时调用的方法
- @OnClose注解的方法,为连接关闭调用的方法
- @OnMessage注解的方法,为收到客户端消息后调用的方法
- @OnError注解的方法,为出现异常时调用的方法
@Component
@Slf4j
@ServerEndpoint(value = "/ws/asset")
public class WebSocketServer
//用来统计连接客户端的数量
private static final AtomicInteger OnlineCount = new AtomicInteger(0);
// concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<>();
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) throws IOException
SessionSet.add(session);
int cnt = OnlineCount.incrementAndGet(); // 在线数加1
log.info("有连接加入,当前连接数为:", cnt);
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException
log.info("来自客户端的消息:",message);
sendMessage(session, "Echo消息内容:"+message);
// broadCastInfo(message); 群发消息
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session)
SessionSet.remove(session);
int cnt = OnlineCount.decrementAndGet();
log.info("有连接关闭,当前连接数为:", cnt);
/**
* 出现错误
*/
@OnError
public void onError(Session session, Throwable error)
log.error("发生错误:,Session ID: ",error.getMessage(),session.getId());
/**
* 发送消息,实践表明,每次浏览器刷新,session会发生变化。
* @param session session
* @param message 消息
*/
private static void sendMessage(Session session, String message) throws IOException
session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));
/**
* 群发消息
* @param message 消息
*/
public static void broadCastInfo(String message) throws IOException
for (Session session : SessionSet)
if(session.isOpen())
sendMessage(session, message重学SpringBoot系列之Mockito测试