如何关闭服务器的grpc流
Posted
技术标签:
【中文标题】如何关闭服务器的grpc流【英文标题】:How to close grpc stream for server 【发布时间】:2017-10-25 13:18:35 【问题描述】:从 grpc ,客户端可以调用 CloseSend 关闭流到服务器,但似乎服务器无法切断与客户端的连接。
【问题讨论】:
你为什么知道?什么错误? @jim 你能找到任何解决方案吗? 【参考方案1】:首先,您不应该从连接的角度考虑 gRPC。
一旦处理程序返回error
,ServerStream
将停止发送数据。服务端不需要CloseSend
这样的函数。
如果你想在返回后执行一些逻辑,“只需”生成一个 goroutine。
【讨论】:
这不是一个很好的答案,因为处理程序例程将在 Recv() 中被阻塞,因此您不能“只从处理程序返回错误”。 grpc.io/docs/languages/go/generated-code/… 来自文档:服务器到客户端流的流结束由双向方法处理程序的返回指示。 是的,如果双向方法处理程序在 Recv() 内部阻塞,它将如何返回?有效的构造是使用三个 goroutine;一个只是阻塞在方法处理程序中,然后是两个单独的 goroutine 用于 Recv() 和 Send(),它们之间有通道。【参考方案2】:解决这个问题的最好方法是使用三个 goroutine。
问题是双向处理程序将在 Recv() 中被阻塞,因此当它想要完成流时不能轻易“返回给调用者”。
另一个问题是,如果网络条件不好,或者另一端不合作,Send() 可能会阻塞(这是您首先要终止连接的一个非常常见的原因!)
通常,您还希望从接收者循环之外向发送者发送消息——使用双向流的典型情况是与世界其他地方进行实际通信。对于更加隔离的请求/响应模式,其他模式通常更易于使用。允许同步发送/接收循环之外的人对消息进行排队会导致额外的同步。
因此,您可以像这样在 Bidi 处理程序中构建代码:
-
创建一些可等待原语 - 一个空 chan、一个 WaitGroup 或只是一个带取消的上下文
启动一个循环调用 Recv() 的 goroutine,并分派传入的请求
为传出消息创建一个通道,并具有一定的最大深度
启动另一个从传出 chan 出列的 goroutine,并调用 Send()
在可等待原语上阻止传入的 Bidi 请求处理程序
现在,当您想尝试发送给发送者 chan 时,请在 select
中执行此操作,如果传出 chan 已满,它将转到 default
。您可以将此用作另一端应该断开连接的信号,或者只是丢弃消息以避免无限缓冲,具体取决于需要。
当想要断开客户端与服务器端的连接时,您应该取消阻塞双向处理程序正在阻塞的等待原语。这最终会使 Send() 和 Recv() 函数返回错误,之后您可以安全地声明连接“完成”。
还有远程客户端关闭的常规情况,Recv()调用返回错误,在这种情况下你也需要关闭它。
如果您想真正关闭所涉及的通道,并支持上下文取消以及断开无响应客户端的连接,您最终将不得不将通道与其他互斥锁同步,以避免意外发送已关闭的通道 - 没有否则可以安全地执行此操作,因为 Send()/Recv() 函数在双向处理程序已经返回之前不会解除阻塞。 (如果没有外部发送者,发送者的所有排队都发生在接收 goroutine 中,这不是问题。不过,这仅适用于非常简单的服务器。)
有点伪代码:
func (m *MyThing) MyBidiServer(stream somepb.Thing_ThingServer)
can := make(chan struct)
thingRunner := &ThingRunner
cancel: can,
send: make(chan *somepb.OutgoingMessage, 100),
m.addRunner(thingRunner) // assume this is mutexed
defer m.removeRunner(thingRunner)
go thingRunner.recvLoop(stream)
go thingRunner.sendLoop(stream)
// could be select here if you also have a context
<-can
func (t *ThingRunner) recvLoop(stream somepb.Thing_ThingServer)
for
msg, err := stream.Recv()
if err != nil
break
t.cancelSafely() // in case client disconnected
func (t *ThingRunner) sendLoop(stream somepb.Thing_ThingServer)
for msg := range t.send
if err := stream.Send(msg); err != nil
break
t.cancelSafely() // in case of network error
func (t *ThingRunner) SendMessage(ctx context.Context, msg *somepb.Message) error
if t.isCanceled()
return ThingClosedError
select
case t.send <- msg:
// all is well
case <-ctx.Done():
return ctx.Err()
default:
// the client is too slow -- disconnect it
t.cancelSafely()
return ClientTooSlowError
return nil
您还需要处理一些小的管理方面的问题,但这就是要点。 cancelSafely() 需要解除对 cancel chan 的阻塞,但它也需要不导致 t.send 上的未来写入恐慌,因为在关闭的 chan 上发送将无条件恐慌。
【讨论】:
以上是关于如何关闭服务器的grpc流的主要内容,如果未能解决你的问题,请参考以下文章
gRPC (Cpp) Streaming - 如果在 grpc_impl::ServerReaderWriter::Read 期间一侧挂起或忘记关闭流,会发生啥?