SpringBoot:简单使用RSocket的Demo

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot:简单使用RSocket的Demo相关的知识,希望对你有一定的参考价值。

当前版本:SpringBoot2.3.12.RELEASEJDK1.8

1. 声明

当前内容主要为个人使用和学习在SpringBoot中操作RSocket,当前内容参考SpringBoot官方文档

具体操作如下:(需要部分Mono的知识)

  1. 创建RSocket的客户端和服务器
  2. 客户端请求服务器并响应详细

基本pom依赖

<dependency>
	<!-- Import dependency management from Spring Boot -->
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-dependencies</artifactId>
	<version>2.3.12.RELEASE</version>
	<type>pom</type>
	<scope>import</scope>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

2. 基本demo

1.配置当前rsocket的端口(写在application.properties中)

spring.rsocket.server.port=8000

2.创建基本的RSocket配置类

@Configuration
public class RSocketConfig {
	private static final int RSOCKET_PORT = 8000;
	private static final String RSOCKET_HOST = "localhost";
	

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		// 配置rsocket的消息处理器
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
				// 配置加码器
				.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
				// 配置解码器
				.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
				// 配置路由匹配器
				.routeMatcher(new PathPatternRouteMatcher()).build();
	}

	@Bean
	public Mono<RSocketRequester> rsocketRequester() {
		// 返回一个Mono对象(用于异步操作,方式同步注入springboot时出现无法注入的问题)
		return RSocketRequester.builder().connectTcp(RSOCKET_HOST, RSOCKET_PORT);
	}

}

注意这里的RSocketRequester,必须在bean容器refresh之后才能拿到(原因,注入直接报错,因为会测试连接端口,但是spring容器并未完全启动)

3. 配置客户端Controller

@RestController
public class RSocketClientController {
	@Autowired
	private Mono<RSocketRequester> rsocketRequesterMono;
	private RSocketRequester rSocketRequester;

	private RSocketRequester getRSocketRequester() {
		if (rSocketRequester == null) {
			synchronized (rsocketRequesterMono) {
				if (rSocketRequester == null) {
					rSocketRequester = rsocketRequesterMono.block(Duration.ofSeconds(10));
				}
			}
		}
		return rSocketRequester;
	}

	@RequestMapping("/sendMsg")
	public String sendMsg() {
		// 阻塞直到获取连接(持续10秒)
		Mono<String> send = getRSocketRequester()
				.route("locate.radars.within")
				.data("你好,服务器!")
				.retrieveMono(String.class);
		System.out.println("客户端接收到消息:"+send.block());
		return "发送消息成功!";
	}

}

直接通过RSocketRequester请求路由,通过携带data数据,和接受返回数据类型retrieveMono方式,最后通过send.block阻塞直到发送数据为止

4. 配置服务器

@RestController
public class RSocketServerController {
	
	@ConnectMapping
	void handle(RSocketRequester requester) {
		System.out.println("连接rsocket server success!");
		/* return "ok"; */ // 这个地方不能有返回值
	}

	@MessageMapping("locate.radars.within")
	public String radars(String msg) {
		System.out.println("调用radars方法成功!收到消息为:"+msg);
		return "服务器已收到消息:"+msg;
	}
}

这里主要配置@ConnectMapping表示连接时会调用,档期那的@MessageMapping就是前面客户端访问的路由地址,data转换为msg并传入,最后将消息返回客户端

这里主要使用tcp连接,如果需要其他方式可以自己配置

3. 测试

启动项目,并查看rsocket的端口(8000)是否已经启动

直接执行测试



注意这里的RSocketRequester只需要一个即可,测试成功!

4. 修改为手动注入RSocketServer

由于使用springboot的自动注入,但是在使用中可能存在其他的问题,所以这里尝试使用手动注入方式

主要修改为:RSocketConfig中的内容

@Configuration
public class RSocketConfig {
	private static final int RSOCKET_PORT = 8000;
	private static final String RSOCKET_HOST = "localhost";

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		// 配置rsocket的消息处理器
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
				// 配置加码器
				.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
				// 配置解码器
				.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
				// 配置路由匹配器
				.routeMatcher(new PathPatternRouteMatcher()).build();
	}

	@Bean
	public RSocketServer rsocketServer(@Autowired RSocketMessageHandler handler) {
		RSocketServer rSocketServer = RSocketServer.create(handler.responder());
		rSocketServer.bind(TcpServerTransport.create(RSOCKET_HOST, RSOCKET_PORT)).block();
		return rSocketServer;
	}

	@Bean
	public Mono<RSocketRequester> rsocketRequester() {
		// 返回一个Mono对象(用于异步操作,方式同步注入springboot时出现无法注入的问题)
		return RSocketRequester.builder().connectTcp(RSOCKET_HOST, RSOCKET_PORT);
	}

}

这里使用RSocketServer,且注释掉当前的application.properties中的

#spring.rsocket.server.port=8000

再次启动测试,发现结果一样(这样就可处理启动错误,或者其他问题了),这里结果就不显示了

以上是关于SpringBoot:简单使用RSocket的Demo的主要内容,如果未能解决你的问题,请参考以下文章

禁用 Springboot RSocket 项目的“默认”Netty 端口

Spring RSocket:基于服务注册发现的 RSocket 负载均衡

RSocket与Spring Security整合

开源 | RSocket Broker:阿里巴巴开源的基于 RSocket 协议的反应式程控消息交换系统

Dubbo3.0新特性集成RSocket,新增响应式支持

聊聊rsocket load balancer的Ewma