STOMP原理与应用开发详解
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了STOMP原理与应用开发详解相关的知识,希望对你有一定的参考价值。
本文首发微信公众号:码上观世界。
STOMP概述
我们已经知道WebSocket是基于TCP协议之上的应用层协议,在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,尤其是允许服务端主动向客户端推送数据,使其在实时推送场景应用很广泛。
正如其名代表的隐含涵义一样,WebSocket虽不是Socket,但是直接使用 WebSocket 就很像使用 Socket来编写 Web 应用程序。因为没有高层级的应用协议,就需要我们定义应用之间所发送消息的语义,还需要确保连接的两端都能遵循这些语义。
STOMP:Simple (or Streaming) Text Orientated Messaging Protocol,即简单(流)文本定向消息协议。最初是为脚本语言(如Ruby、Python和Perl)创建的,用于连接到企业消息代理,它被设计用于处理常用消息传递模式的最小功能子集,STOMP可以用于任何可靠的双向流网络协议,如TCP和WebSocket,虽然STOMP是一个面向文本的协议,但消息payload可以是文本或二进制。就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP 在 WebSocket 之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。STOMP是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议,它提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互,用于client之间进行异步消息传输的简单文本协议。
对于STOMP协议来说, client分为消费者client与生产者client两种. server是指broker, 也就是消息队列的管理者。STOMP协议并不是为websocket设计的, 它是属于消息队列的一种协议, 和amqp, jms平级。只不过由于它的简单性恰巧可以用于定义websocket的消息体格式。STOMP协议很多mq都已支持, 比如rabbitmq, activemq。很多语言也都有STOMP协议的解析client库.
与HTTP请求和响应数据格式类似,STOMP帧由命令、一个或多个头信息以及负载所组成。结构如下:
COMMAND
header1:value1
header2:value2
Body^@
其中^@代表结尾.
例如,如下就是发送数据的一个STOMP帧:
SEND
destination:/app/marco
content-length:20
{\\"message\\":\\"Marco!\\"}
与http相似有三部分组成: 命令, header, 消息体。
命令与header使用utf-8格式, body可以是二进制也可以是文本。
命令有SEND, SUBSCRIBE, MESSAGE, CONNECT, CONNECTED等。
header类似http有content-length, content-type等。
消息体类似http可以是二进制也可以是文本。
STOMP应用开发步骤
1. 添加相应依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>5.3.7</version>
</dependency>
其中spring-messaging提供消息封装和处理相关的功能,为被动引入的依赖,可以不用明确声明依赖。
2. 实现消息请求处理的Controller
spring-messaging提供了@RequestMapping和@SubscribeMapping注解,用于实现消息到达的处理逻辑和消息订阅的处理逻辑,比如客户端向路径/app/greeting发送消息时触发greetings方法调用,当客户端订阅路径/app/subscribe时触发subscribe方法调用,其中app为定义的应用前缀字符串,下文介绍。
@RestController
public class StompController {
@Autowired
private SimpMessagingTemplate template;
@MessageMapping("/greeting")
public void greetings(@Payload String data) {
String msg = "return data:" + data;
template.convertAndSend("/queue/subscribe", msg);
}
@SubscribeMapping("/subscribe")
public String subscribe() {
return "subscribe success";
}
}
@RequestMapping和@SubscribeMapping注解修饰的方法都可以选择性地定义返回值或者定义void方法返回值。当@MessageMapping注解标示的方法有返回值的时候,返回的对象将会进行转换(通过消息转换器)并放到STOMP帧的负载中,然后发给消息代理。默认情况下,帧所发往的目的地会与触发处理器方法的目的地相同,只不过会加上“/topic”前缀。这点可以从代码中得到验证:
class SendToMethodReturnValueHandler
defaultDestinationPrefix在类属性中声明如下:
class SendToMethodReturnValueHandler{
private String defaultDestinationPrefix = "/topic";
private String defaultUserDestinationPrefix = "/queue";
}
有返回值的消息路由可以使用@SendTo或者@SendToUser 重写消息目的地;
如果 @SubscribeMapping 注解的控制器方法有返回值的话,返回值会直接发送到客户端,不经过代理。如果加上@SendTo 注解的话,则要经过消息代理。
如果没有返回值,假如仍需要向客户端发消息,可以使用模板方法发送消息:
template.convertAndSend("/queue/subscribe", msg);
spring-messaging 定义了一个 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),可以实现自由的向任意目的地发送消息,并且订阅此目的地的所有用户都能收到消息。
@MessageMapping("/greeting")
@SendTo("/queue/greetings")
public void greetings(@Payload String data) {
String msg = "return data:" + data;
template.convertAndSend("/queue/subscribe", msg);
}
@MessageMapping("/message")
@SendToUser("/topic/subscribe")
public String sendToUser(String message) {
return message +" from /users/topic/subscribe";
}
greetings方法中,convertAndSend将消息发送到/queue/subscribe路径上,sendToUser方法中,@SendToUser注解将消息发送到/user/topic/subscribe路径上,路径被加上了前缀/user。
3. 实现配置WebSocketMessageBrokerConfigurer接口
该接口用于配置STOMP的响应端口路径和消息的路由前缀等信息:
上述配置,它重载了registerStompEndpoints()方法,将“/stomp”注册为STOMP端点。客户端在订阅或发布消息到目的地路径前,要连接该端点。这个路径与发送、接收消息和订阅消息的路径不同。
WebSocketStompConfig还通过重载configureMessageBroker()方法配置了一个简单的消息代理。消息代理StompBroker将会处理前缀为“/topic”和“/queue”的消息,即enableSimpleBroker定义的前缀。应用程序将会处理带有“/app”前缀的消息,即setApplicationDestinationPrefixes定义的前缀,在Controller中@RequestMapping和@SubscribeMapping注解修饰的路径默认带上该应用前缀。以/user开头的消息会将消息重路由到某个用户独有的目的地上,这常用于消息的私有推送场景。
enableSimpleBroker创建具有一个或多个用于发送和接收消息的目的地的内存中消息代理。在上面的示例中, 定义了两个目标前缀:topic和queue。StompConfig实例中重写了2个方法,实际上WebSocketMessageBrokerConfigurer还定义了多种方法可以重写:
4. 实现消息客户端
这里以stomp js客户端为例,延时client跟stomp代理服务器之间的交互,script代码如下:
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script type="text/javascript">
var url="ws://localhost:8080/stomp"
var client = Stomp.client(url);
client.subscribe('/topic/message', function(message){
console.log('subscribe topic callback:'+message.body);
});
var connect_callback = function(frame) {
var payload = JSON.stringify({'message':'greeting to stomp broker!'});
client.send("/app/message",{},payload);
};
var error_callback = function(error) {
alert(error.headers.message);
};
var headers = {
login: '',
passcode: '',
// additional header
'client-id': 'stomp-client-id'
};
client.connect(headers, connect_callback, error_callback);
</script>
从浏览器console中打印信息来看,上述过程涉及到的消息类型有
CONNECT,CONNECTED,SUBSCRIBE,SEND,MESSGE
其中CONNECT,CONNECTED,SUBSCRIBE完成了客户端和代理服务器之间的连接和连接确认以及消息订阅的过程,SEND,MESSGE涉及到了客户端和服务器之间的发消息和响应消息的过程。
STOMP与spring-messaging
一旦公开了STOMP端点,Spring应用程序就成为连接的客户端的STOMP代理,本节描述服务器端的消息流。
spring-messaging模块,包括几部分的消息组件:
Message:消息实体的抽象,包括消息头和消息体
MessageHandler:消息处理抽象
MessageChannel:消息发送渠道,生产者和消费者之间消息发送的抽象
其中MessageChannel有多种扩展子类,比如:
SubscribableChannel:消息订阅渠道,订阅和取消订阅消息的抽象
ExecutorSubscribableChannel:消息订阅渠道的一种,负责使用Executor向订阅者发送消息
MessageHandler 的具体实现类共有 两类九种,分别用来处理不同类型的Message
未实现org.springframework.context.SmartLifecycle接口:
UserRegistryMessageHandler
NoOpMessageHandler
实现org.springframework.context.SmartLifecycle接口:
SubProtocolWebSocketHandler
SimpAnnotationMethodMessageHandler
WebSocketAnnotationMethodMessageHandler
NoOpBrokerMessageHandler
SimpleBrokerMessageHandler
StompBrokerRelayMessageHandler
UserDestinationMessageHandler
其中SimpleBrokerMessageHandler用于实现基于内存的简单Broker消息代理,而StompBrokerRelayMessageHandler可以通过中继方式使用外部消息代理,如RabbitMQ等。开启中继代理可以通过如下方式:
registry.enableStompBrokerRelay("/topic","/queue");
两类接口的区别是是否实现SmartLifecycle接口,SmartLifecycle接口主要提供有关生命周期的start和stop等方法:
比如AbstractBrokerMessageHandler,它是NoOpBrokerMessageHandler、SimpleBrokerMessageHandler和StompBrokerRelayMessageHandler的父类接口,实现了SmartLifecycle,在start()、stop()方法中分别实现了对相关MessageChannel的订阅和取消订阅。不实现SmartLifecycle接口的那类MessageHandler就是不需要绑定MessageChannel,相反另一类则是用来在指定生命周期订阅或解订阅MessageChannel。
图中还涉及到了三个消息通道:
clientInboundChannel:传递从WebSocket客户端收到的消息
clientOutboundChannel:用于向WebSocket客户端发送服务器消息。
brokerChannel:用于从服务器端应用程序代码中向消息代理发送消息。
其中clientInboundChannel和brokerChannel都属于MessageChannel子类型,用于管理消息的订阅和取消订阅。
当从WebSocket连接接收到消息时,它们被解码到STOMP帧,转换为Spring Message表示,并发送到clientInboundChannel进行进一步处理。例如,以/app开头的destination header的STOMP消息可以路由到带注解的控制器中的@MessageMapping方法,而/topic和/queue消息可以直接路由到消息代理。
处理来自客户端的STOMP消息的@Controller注解可以通过brokerChannel向消息代理发送消息,而代理通过clientOutboundChannel向匹配的订阅者广播消息。
以上文中示例代码为例,其消息从生产者发出到消费者消费, 流程如下:
客户端连接到ws://localhost:8080/stomp,一旦建立了WebSocket连接,STOMP帧就开始在其上流动。
客户端发送带有/topic/message的destination header的SUBSCRIBE帧,接收和解码后,消息被发送到clientInboundChannel,然后被路由到消息代理,消息代理存储客户端订阅。
客户端向/app/greeting发送一个SEND帧,/app前缀有助于将其路由到带注解的控制器。去掉/app前缀后,destination剩余的/greeting部分被映射到@MessageMapping修饰的greetings方法。
greetings方法返回的值被转换为一个Spring Message,其payload基于返回值和/topic/greeting的默认destination header(由/app替换为/topic的输入destination派生而来)。这种情况当没有被@SendTo或者@SendToUser修饰时发生,否则该路由将被重写到指定的路径。
消息代理找到所有匹配的订阅者,并通过clientOutboundChannel向每个订阅者发送MESSAGE帧,消息从该通道编码为STOMP帧并在WebSocket连接上发送。
参考资料
https://stomp.github.io/stomp-specification-1.2.html#STOMP_Frames
https://blog.csdn.net/a617137379/article/details/78765025
更多文章,欢迎扫码关注:
以上是关于STOMP原理与应用开发详解的主要内容,如果未能解决你的问题,请参考以下文章
将 Spring websockets (sockJS + Stomp) 与基于令牌的身份验证 (JWT) 一起使用的最佳方法