如何关闭服务器的grpc流

Posted

技术标签:

【中文标题】如何关闭服务器的grpc流【英文标题】:How to close grpc stream for server 【发布时间】:2017-10-25 13:18:35 【问题描述】:

从 grpc ,客户端可以调用 CloseSend 关闭流到服务器,但似乎服务器无法切断与客户端的连接。

【问题讨论】:

你为什么知道?什么错误? @jim 你能找到任何解决方案吗? 【参考方案1】:

首先,您不应该从连接的角度考虑 gRPC。 一旦处理程序返回errorServerStream 将停止发送数据。服务端不需要CloseSend这样的函数。

如果你想在返回后执行一些逻辑,“只需”生成一个 goroutine。

【讨论】:

这不是一个很好的答案,因为处理程序例程将在 Recv() 中被阻塞,因此您不能“只从处理程序返回错误”。 grpc.io/docs/languages/go/generated-code/… 来自文档:服务器到客户端流的流结束由双向方法处理程序的返回指示。 是的,如果双向方法处理程序在 Recv() 内部阻塞,它将如何返回?有效的构造是使用三个 goroutine;一个只是阻塞在方法处理程序中,然后是两个单独的 goroutine 用于 Recv() 和 Send(),它们之间有通道。【参考方案2】:

解决这个问题的最好方法是使用三个 goroutine。

问题是双向处理程序将在 Recv() 中被阻塞,因此当它想要完成流时不能轻易“返回给调用者”。

另一个问题是,如果网络条件不好,或者另一端不合作,Send() 可能会阻塞(这是您首先要终止连接的一个非常常见的原因!)

通常,您还希望从接收者循环之外向发送者发送消息——使用双向流的典型情况是与世界其他地方进行实际通信。对于更加隔离的请求/响应模式,其他模式通常更易于使用。允许同步发送/接收循环之外的人对消息进行排队会导致额外的同步。

因此,您可以像这样在 Bidi 处理程序中构建代码:

    创建一些可等待原语 - 一个空 chan、一个 WaitGroup 或只是一个带取消的上下文 启动一个循环调用 Recv() 的 goroutine,并分派传入的请求 为传出消息创建一个通道,并具有一定的最大深度 启动另一个从传出 chan 出列的 goroutine,并调用 Send() 在可等待原语上阻止传入的 Bidi 请求处理程序

现在,当您想尝试发送给发送者 chan 时,请在 select 中执行此操作,如果传出 chan 已满,它将转到 default。您可以将此用作另一端应该断开连接的信号,或者只是丢弃消息以避免无限缓冲,具体取决于需要。

当想要断开客户端与服务器端的连接时,您应该取消阻塞双向处理程序正在阻塞的等待原语。这最终会使 Send() 和 Recv() 函数返回错误,之后您可以安全地声明连接“完成”。

还有远程客户端关闭的常规情况,Recv()调用返回错误,在这种情况下你也需要关闭它。

如果您想真正关闭所涉及的通道,并支持上下文取消以及断开无响应客户端的连接,您最终将不得不将通道与其他互斥锁同步,以避免意外发送已关闭的通道 - 没有否则可以安全地执行此操作,因为 Send()/Recv() 函数在双向处理程序已经返回之前不会解除阻塞。 (如果没有外部发送者,发送者的所有排队都发生在接收 goroutine 中,这不是问题。不过,这仅适用于非常简单的服务器。)

有点伪代码:

func (m *MyThing) MyBidiServer(stream somepb.Thing_ThingServer) 
    can := make(chan struct)
    thingRunner := &ThingRunner
        cancel: can,
        send: make(chan *somepb.OutgoingMessage, 100),
    
    m.addRunner(thingRunner) // assume this is mutexed
    defer m.removeRunner(thingRunner)
    go thingRunner.recvLoop(stream)
    go thingRunner.sendLoop(stream)
    // could be select here if you also have a context
    <-can


func (t *ThingRunner) recvLoop(stream somepb.Thing_ThingServer) 
    for 
        msg, err := stream.Recv()
        if err != nil 
            break
        
    
    t.cancelSafely() // in case client disconnected


func (t *ThingRunner) sendLoop(stream somepb.Thing_ThingServer) 
    for msg := range t.send 
        if err := stream.Send(msg); err != nil 
            break
        
    
    t.cancelSafely() // in case of network error


func (t *ThingRunner) SendMessage(ctx context.Context, msg *somepb.Message) error 
    if t.isCanceled() 
        return ThingClosedError
    
    select 
        case t.send <- msg:
            // all is well
        case <-ctx.Done():
            return ctx.Err()
        default:
            // the client is too slow -- disconnect it
            t.cancelSafely()
            return ClientTooSlowError
    
    return nil

您还需要处理一些小的管理方面的问题,但这就是要点。 cancelSafely() 需要解除对 cancel chan 的阻塞,但它也需要不导致 t.send 上的未来写入恐慌,因为在关闭的 chan 上发送将无条件恐慌。

【讨论】:

以上是关于如何关闭服务器的grpc流的主要内容,如果未能解决你的问题,请参考以下文章

grpc-java:正确处理客户端重试以进行服务流调用

gRPC 客户端流是如何实现的

基于 gRPC 和 .NET Core 的服务器流

gRPC (Cpp) Streaming - 如果在 grpc_impl::ServerReaderWriter::Read 期间一侧挂起或忘记关闭流,会发生啥?

i.grpc.internal.AbstractClientStream - 接收到的关于关闭流的数据意义

GRPC Java 服务器服务