rsocket-java小试牛刀

Posted 码匠的流水账

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rsocket-java小试牛刀相关的知识,希望对你有一定的参考价值。

本文主要研究一下rsocket-java

RSocket

rsocket-core-0.12.1-sources.jar!/io/rsocket/RSocket.java

public interface RSocket extends Availability, Closeable {

/**
  * Fire and Forget interaction model of {@code RSocket}.
  *
  * @param payload Request payload.
  * @return {@code Publisher} that completes when the passed {@code payload} is successfully
  *     handled, otherwise errors.
  */
Mono<Void> fireAndForget(Payload payload);

/**
  * Request-Response interaction model of {@code RSocket}.
  *
  * @param payload Request payload.
  * @return {@code Publisher} containing at most a single {@code Payload} representing the
  *     response.
  */
Mono<Payload> requestResponse(Payload payload);

/**
  * Request-Stream interaction model of {@code RSocket}.
  *
  * @param payload Request payload.
  * @return {@code Publisher} containing the stream of {@code Payload}s representing the response.
  */
Flux<Payload> requestStream(Payload payload);

/**
  * Request-Channel interaction model of {@code RSocket}.
  *
  * @param payloads Stream of request payloads.
  * @return Stream of response payloads.
  */
Flux<Payload> requestChannel(Publisher<Payload> payloads);

/**
  * Metadata-Push interaction model of {@code RSocket}.
  *
  * @param payload Request payloads.
  * @return {@code Publisher} that completes when the passed {@code payload} is successfully
  *     handled, otherwise errors.
  */
Mono<Void> metadataPush(Payload payload);

@Override
default double availability() {
    return isDisposed() ? 0.0 : 1.0;
}
}
  • RSocket接口继承了Availability(定义double availability()方法)及Closeable(定义了Mono<Void> onClose()方法)接口

  • RSocket定义了fireAndForget、requestResponse、requestStream、requestChannel方法分别对应4种Interaction Model

  • RSocket的Frame包含metadata及data payload,其中metadata可选,可以用于描述data payload,因而RSocket还定义了metadataPush方法用于push metadata

Interaction Model

fireAndForget

    @Test
  public void testFireAndForget() throws InterruptedException {
      //SERVER
      RSocketFactory.receive()
              .acceptor(
                      (setupPayload, reactiveSocket) ->
                              Mono.just(
                                      new AbstractRSocket() {
                                          @Override
                                          public Mono<Void> fireAndForget(Payload payload) {
                                                System.out.printf("fire-forget: %s%n", payload.getDataUtf8());
                                              return Mono.empty();
                                          }
                                      }))
              .transport(TcpServerTransport.create("localhost", 8080))
              .start()
              .subscribe();

      //CLIENT
      RSocket socket =
              RSocketFactory.connect()
                      .transport(TcpClientTransport.create("localhost", 8080))
                      .start()
                      .block();

      socket
              .fireAndForget(DefaultPayload.create("Hello"))
              .block();

      socket.dispose();

      TimeUnit.SECONDS.sleep(5);
  }

类似udp,无需ack,比较适合metrics上报、访问日志上报等

requestResponse

    @Test
  public void testRequestResponse(){
      //SERVER
      RSocketFactory.receive()
              .acceptor(
                      (setupPayload, reactiveSocket) ->
                              Mono.just(
                                      new AbstractRSocket() {
                                          @Override
                                          public Mono<Payload> requestResponse(Payload p) {
                                              return Mono.just(p);
                                          }
                                      }))
              .transport(TcpServerTransport.create("localhost", 8080))
              .start()
              .subscribe();

      //CLIENT
      RSocket socket =
              RSocketFactory.connect()
                      .transport(TcpClientTransport.create("localhost", 8080))
                      .start()
                      .block();

      socket
              .requestResponse(DefaultPayload.create("Hello"))
                .map(Payload::getDataUtf8)
              .onErrorReturn("error")
                .doOnNext(System.out::println)
              .block();

      socket.dispose();
  }

类似http,但是优于http,因为它是异步的,而且是multiplexed

requestStream

    @Test
  public void testRequestStream(){
      //SERVER
      RSocketFactory.receive()
              .acceptor(new SocketAcceptor() {
                  @Override
                  public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
                      return Mono.just(
                              new AbstractRSocket() {
                                  @Override
                                  public Flux<Payload> requestStream(Payload payload) {
                                      return Flux.interval(Duration.ofMillis(100))
                                                .map(aLong -> DefaultPayload.create("Interval: " + aLong));
                                  }
                              });
                  }
              })
              .transport(TcpServerTransport.create("localhost", 7000))
              .start()
              .subscribe();

      //CLIENT
      RSocket socket =
              RSocketFactory.connect()
                      .transport(TcpClientTransport.create("localhost", 7000))
                      .start()
                      .block();

      socket
              .requestStream(DefaultPayload.create("Hello"))
                .map(Payload::getDataUtf8)
                .doOnNext(System.out::println)
              .take(10)
              .then()
              .doFinally(signalType -> socket.dispose())
              .then()
              .block();
  }

类似Request-Response(返回Mono),只不过返回的是Flux

requestChannel

    @Test
  public void testRequestChannel(){
      //SERVER
      RSocketFactory.receive()
              .acceptor(new SocketAcceptor(){
                  @Override
                  public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
                      return Mono.just(
                              new AbstractRSocket() {
                                  @Override
                                  public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                                      return Flux.from(payloads)
                                                .map(Payload::getDataUtf8)
                                                .map(s -> "Echo: " + s)
                                                .map(DefaultPayload::create);
                                  }
                              });
                  }
              })
              .transport(TcpServerTransport.create("localhost", 8080))
              .start()
              .subscribe();

      //CLIENT
      RSocket socket =
              RSocketFactory.connect()
                      .transport(TcpClientTransport.create("localhost", 8080))
                      .start()
                      .block();

      socket
              .requestChannel(
                      Flux.interval(Duration.ofMillis(1000)).map(i -> DefaultPayload.create("Hello")))
                .map(Payload::getDataUtf8)
                .doOnNext(System.out::println)
              .take(10)
              .doFinally(signalType -> socket.dispose())
              .then()
              .block();
  }

类似websocket,可以双向通信

MetadataPush

    @Test
  public void testMetadataPush() throws InterruptedException {
      //SERVER
      RSocketFactory.receive()
              .acceptor(
                      (setupPayload, reactiveSocket) ->
                              Mono.just(
                                      new AbstractRSocket() {

                                          @Override
                                          public Mono<Void> metadataPush(Payload payload) {
                                                System.out.printf("metadataPush: %s%n", payload.getDataUtf8());
                                              return Mono.empty();
                                          }
                                      }))
              .transport(TcpServerTransport.create("localhost", 8080))
              .start()
              .subscribe();

      //CLIENT
      RSocket socket =
              RSocketFactory.connect()
                      .transport(TcpClientTransport.create("localhost", 8080))
                      .start()
                      .block();

      socket
              .metadataPush(DefaultPayload.create("hello","version=1.0.0+"))
              .block();

      socket.dispose();

      TimeUnit.SECONDS.sleep(5);
  }
  • RSocket还定义了metadataPush方法,与fireAndForget方法不同的是metadataPush方法会等待data pushed成功,然后在接收到对方发送的complete signal时complete

小结

  • RSocket是一种bi-directional、multiplexed、message-based的二进制协议

  • RSocket有四种Interaction Model,分别是Request-Response、Fire-and-Forget、Request-Stream、Channel

  • RSocket的Frame包含metadata及data payload,其中metadata可选,可以用于描述data payload,因而RSocket还定义了metadataPush方法用于push metadata

doc

  • rsocket.io

以上是关于rsocket-java小试牛刀的主要内容,如果未能解决你的问题,请参考以下文章

java代码块牛刀小试

VSCode Snippet 小试牛刀

小试牛刀-利用AST平坦化一段瑞数代码

小试牛刀-利用AST平坦化一段瑞数代码

Xamarin 小试牛刀 通知栏消息通知和按钮(基于Java代码人肉转换)

zTree初体验——小试牛刀