SpringBoot:简单使用RSocket的Demo
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot:简单使用RSocket的Demo相关的知识,希望对你有一定的参考价值。
当前版本:SpringBoot2.3.12.RELEASE
、JDK1.8
1. 声明
当前内容主要为个人使用和学习在SpringBoot中操作RSocket,当前内容参考SpringBoot官方文档
具体操作如下:(需要部分Mono的知识)
- 创建RSocket的客户端和服务器
- 客户端请求服务器并响应详细
基本pom依赖
<dependency>
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.12.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
2. 基本demo
1.配置当前rsocket的端口(写在application.properties中)
spring.rsocket.server.port=8000
2.创建基本的RSocket配置类
@Configuration
public class RSocketConfig {
private static final int RSOCKET_PORT = 8000;
private static final String RSOCKET_HOST = "localhost";
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
// 配置rsocket的消息处理器
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
// 配置加码器
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
// 配置解码器
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
// 配置路由匹配器
.routeMatcher(new PathPatternRouteMatcher()).build();
}
@Bean
public Mono<RSocketRequester> rsocketRequester() {
// 返回一个Mono对象(用于异步操作,方式同步注入springboot时出现无法注入的问题)
return RSocketRequester.builder().connectTcp(RSOCKET_HOST, RSOCKET_PORT);
}
}
注意这里的RSocketRequester
,必须在bean容器refresh之后才能拿到(原因,注入直接报错,因为会测试连接端口,但是spring容器并未完全启动)
3. 配置客户端Controller
@RestController
public class RSocketClientController {
@Autowired
private Mono<RSocketRequester> rsocketRequesterMono;
private RSocketRequester rSocketRequester;
private RSocketRequester getRSocketRequester() {
if (rSocketRequester == null) {
synchronized (rsocketRequesterMono) {
if (rSocketRequester == null) {
rSocketRequester = rsocketRequesterMono.block(Duration.ofSeconds(10));
}
}
}
return rSocketRequester;
}
@RequestMapping("/sendMsg")
public String sendMsg() {
// 阻塞直到获取连接(持续10秒)
Mono<String> send = getRSocketRequester()
.route("locate.radars.within")
.data("你好,服务器!")
.retrieveMono(String.class);
System.out.println("客户端接收到消息:"+send.block());
return "发送消息成功!";
}
}
直接通过RSocketRequester请求路由,通过携带data数据,和接受返回数据类型retrieveMono方式,最后通过send.block阻塞直到发送数据为止
4. 配置服务器
@RestController
public class RSocketServerController {
@ConnectMapping
void handle(RSocketRequester requester) {
System.out.println("连接rsocket server success!");
/* return "ok"; */ // 这个地方不能有返回值
}
@MessageMapping("locate.radars.within")
public String radars(String msg) {
System.out.println("调用radars方法成功!收到消息为:"+msg);
return "服务器已收到消息:"+msg;
}
}
这里主要配置@ConnectMapping表示连接时会调用,档期那的@MessageMapping就是前面客户端访问的路由地址,data转换为msg并传入,最后将消息返回客户端
这里主要使用tcp连接,如果需要其他方式可以自己配置
3. 测试
启动项目,并查看rsocket的端口(8000)是否已经启动
直接执行测试
注意这里的RSocketRequester只需要一个即可,测试成功!
4. 修改为手动注入RSocketServer
由于使用springboot的自动注入,但是在使用中可能存在其他的问题,所以这里尝试使用手动注入方式
主要修改为:RSocketConfig中的内容
@Configuration
public class RSocketConfig {
private static final int RSOCKET_PORT = 8000;
private static final String RSOCKET_HOST = "localhost";
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
// 配置rsocket的消息处理器
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
// 配置加码器
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
// 配置解码器
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
// 配置路由匹配器
.routeMatcher(new PathPatternRouteMatcher()).build();
}
@Bean
public RSocketServer rsocketServer(@Autowired RSocketMessageHandler handler) {
RSocketServer rSocketServer = RSocketServer.create(handler.responder());
rSocketServer.bind(TcpServerTransport.create(RSOCKET_HOST, RSOCKET_PORT)).block();
return rSocketServer;
}
@Bean
public Mono<RSocketRequester> rsocketRequester() {
// 返回一个Mono对象(用于异步操作,方式同步注入springboot时出现无法注入的问题)
return RSocketRequester.builder().connectTcp(RSOCKET_HOST, RSOCKET_PORT);
}
}
这里使用RSocketServer,且注释掉当前的application.properties中的
#spring.rsocket.server.port=8000
再次启动测试,发现结果一样(这样就可处理启动错误,或者其他问题了),这里结果就不显示了
以上是关于SpringBoot:简单使用RSocket的Demo的主要内容,如果未能解决你的问题,请参考以下文章
禁用 Springboot RSocket 项目的“默认”Netty 端口
Spring RSocket:基于服务注册发现的 RSocket 负载均衡