等待最后一个元素时阻塞 Flux

Posted

技术标签:

【中文标题】等待最后一个元素时阻塞 Flux【英文标题】:Blocked Flux while waiting for last element 【发布时间】:2020-01-14 13:54:10 【问题描述】:

我想通过 rsocket 连接两个应用程序。一个是用 GO 编写的,第二个是用 Kotlin 编写的。 我想实现客户端发送数据流和服务器发送确认响应的连接。

问题在于等待所有元素,如果服务器没有 BlockOnLast(ctx),则读取整个流,但在所有条目到达之前发送响应。如果添加了 BlockOnLast(ctx),则 Server (GoLang) 卡住了。

我还在 Kotlin 中编写了客户端,在这种情况下,整个通信工作得非常好。

有人可以帮忙吗?

GO服务器:

package main

import (
"context"
"github.com/golang/protobuf/proto"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
"rsocket-go-rpc-test/proto"
)

func main() 
addr := "tcp://127.0.0.1:8081"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := rsocket.Receive().
    Fragment(1024).
    Resume().
    Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) 
        return rsocket.NewAbstractSocket(
            rsocket.RequestChannel(func(payloads rx.Publisher) flux.Flux 
                println("START")

                payloads.(flux.Flux).
                    DoOnNext(func(input payload.Payload) 
                        chunk := &pl_dwojciechowski_proto.Chunk
                        proto.Unmarshal(input.Data(), chunk)
                        println(string(chunk.Content))
                    ).BlockLast(ctx)

                return flux.Create(func(i context.Context, sink flux.Sink) 
                    status, _ := proto.Marshal(&pl_dwojciechowski_proto.UploadStatus
                        Message: "OK",
                        Code:    0,
                    )

                    sink.Next(payload.New(status, make([]byte, 1)))
                    sink.Complete()
                    println("SENT")
                )
            ),
        ), nil
    ).
    Transport(addr).
    Serve(ctx)
panic(err)


Kotlin 客户端:

private fun clientCall() 
val rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(8081)).start().block()
val client = FileServiceClient(rSocket)

val requests: Flux<Chunk> = Flux.range(1, 10)
    .map  i: Int -> "sending -> $i" 
    .map<Chunk> 
        Chunk.newBuilder()
            .setContent(ByteString.copyFrom(it.toByteArray())).build()
    

val response = client.send(requests).block() ?: throw Exception("")
rSocket.dispose()
System.out.println(response.message)


相当于用 Kotlin 编写的 GO:

    val serviceServer = FileServiceServer(DefaultService(), Optional.empty(), Optional.empty())
val closeableChannel = RSocketFactory.receive()
    .acceptor  setup: ConnectionSetupPayload?, sendingSocket: RSocket? ->
        Mono.just(
            RequestHandlingRSocket(serviceServer)
        )
    
    .transport(TcpServerTransport.create(8081))
    .start()
    .block()
    closeableChannel.onClose().block()

class DefaultService : FileService 
override fun send(messages: Publisher<Service.Chunk>?, metadata: ByteBuf?): Mono<Service.UploadStatus> 
    return Flux.from(messages)
        .windowTimeout(10, Duration.ofSeconds(500))
        .flatMap(Function.identity())
        .doOnNext  println(it.content.toStringUtf8()) 
        .then(Mono.just(Service.UploadStatus.newBuilder().setCode(Service.UploadStatusCode.Ok).setMessage("test").build()))


服务器输出:

START
sending -> 1

【问题讨论】:

【参考方案1】:

解决方法如下:

package main
import (
   "context"
   "github.com/golang/protobuf/proto"
   "github.com/rsocket/rsocket-go"
   "github.com/rsocket/rsocket-go/payload"
   "github.com/rsocket/rsocket-go/rx"
   "github.com/rsocket/rsocket-go/rx/flux"
   "rsocket-go-rpc-test/proto"
)
type TestService struct 
   totals int
pl_dwojciechowski_proto.FileService

var statusOK = &pl_dwojciechowski_proto.UploadStatus
   Message: "code",
Code:    pl_dwojciechowski_proto.UploadStatusCode_Ok,

var statusErr = &pl_dwojciechowski_proto.UploadStatus
   Message: "code",
Code:    pl_dwojciechowski_proto.UploadStatusCode_Failed,

func main() 
   addr := "tcp://127.0.0.1:8081"
ctx, cancel := context.WithCancel(context.Background())
   defer cancel()
   err := rsocket.Receive().
      Fragment(1024).
      Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) 
         return rsocket.NewAbstractSocket(
            rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux 
               dataReceivedChan := make(chan bool, 1)
               toChan, _ := flux.Clone(msgs).
                  DoOnError(func(e error) 
                     dataReceivedChan <- false
).
                  DoOnComplete(func() 
                     dataReceivedChan <- true
).
                  ToChan(ctx, 1)
               fluxResponse := flux.Create(func(ctx context.Context, s flux.Sink) 
                  gluedContent := make([]byte, 1024)
                  for c := range toChan 
                     chunk := pl_dwojciechowski_proto.Chunk
                     _ = chunk.XXX_Unmarshal(c.Data())
                     gluedContent = append(gluedContent, chunk.Content...)
                  
                  if <-dataReceivedChan 
                     marshal, _ := proto.Marshal(statusOK)
                     s.Next(payload.New(marshal, nil))
                     s.Complete()
                   else 
                     marshal, _ := proto.Marshal(statusErr)
                     s.Next(payload.New(marshal, nil))
                     s.Complete()
                  
               )
               return fluxResponse
),
), nil
).
      Transport(addr).
      Serve(ctx)
   panic(err)

【讨论】:

以上是关于等待最后一个元素时阻塞 Flux的主要内容,如果未能解决你的问题,请参考以下文章

阻塞队列--概述

在使用 Flux 时顺序调用非阻塞操作,包括重试

java的monitor机制中,为啥阻塞队列用list等待队列用set

BlockingQueu 阻塞队列

如何在不阻塞的情况下从 Flux 获取列表?

list