如何解决 go 和 grpc 中的 pub-sub 问题?
Posted
技术标签:
【中文标题】如何解决 go 和 grpc 中的 pub-sub 问题?【英文标题】:How to solve pub-sub problem in go and grpc? 【发布时间】:2021-12-01 04:38:49 【问题描述】:在 go grpc 服务中,我有一个接收者(发布者)事件循环,发布者可以检测到它希望发送者停止。但是通道原则说我们不应该在接收方关闭通道,只在发送方关闭。它应该如何受到威胁?
情况如下。想象一下聊天。第一个客户端 - 订阅者 - 接收消息,由于 grpc 限制,没有 goroutine 无法完成其流式传输。第二个客户端 - 发布者正在向聊天发送消息,所以它是另一个 goroutine。您必须将消息从发布者传递给订阅者接收客户端,前提是订阅者未关闭其连接(强制从接收者端关闭通道)
代码中的问题:
//1st client goroutine - subscriber
func (s *GRPCServer) WatchMessageServer(req *WatchMessageRequest, stream ExampleService_WatchMessageServer) error
ch := s.NewClientChannel()
// natively blocks goroutine with send to its stream, until send gets an error
for
msg, ok := <-ch
if !ok
return nil
err := stream.Send(msg) // if this fails - we need to close ch from receiver side to "propagate" closing signal
if err != nil
return err
//2nd client goroutine - publisher
func (s *GRPCServer) SendMessage(ctx context.Context, req *SendMessageRequest) (*emptypb.Empty, error)
for i := range s.clientChannels
s.clientChannels[i] <- req
// no way other than panic, to know when to remove channel from list. or need to make a wrapper with recover..
return nil
【问题讨论】:
关闭通道上的发送操作出现恐慌,期间。您要么必须在发送者和接收者之间进行协调,要么设计一个不同的解决方案。如果没有一个实际的例子,你应该如何做到这一点很难说。 @JimB 也许解决方案只是处理恐慌?我在 PS2 部分提供了示例。 不要通过恐慌恢复“解决”这个问题 - 遵循 @JimB 的建议并考虑重新设计。 您可能需要等待一对频道。一种供服务器传输,另一种供客户端发出其连接已关闭的信号。 gRPCclient
和 server
可以(并且可能)在完全不同的网络/硬件/内存空间上运行。 Go
渠道沟通在这里毫无意义。如果客户端想要中止流 - 关闭流。服务器将通过连接context.Context
看到这一点 - 因此,当检测到此事件时停止服务。
【参考方案1】:
我最初通过搜索得到了一个线索,感谢这个答案,answer here 中提供了解决方案。
提供流解决方案示例代码,我猜它是通用发布-订阅问题的实现:
//1st client goroutine - subscriber
func (s *GRPCServer) WatchMessageServer(req *WatchMessageRequest, stream ExampleService_WatchMessageServer) error
s.AddClientToBroadcastList(stream)
select
case <-stream.Context().Done(): // *** promised that it would signal when client closes stream
return stream.Context().Err() // stream will be closed immediately after return
case <-s.ctx.Done(): // program shutdown
return s.ctx.Err()
//2nd client goroutine - publisher
func (s *GRPCServer) SendMessage(ctx context.Context, req *SendMessageRequest) (*emptypb.Empty, error)
for i := range s.clientStreams
err := s.clientStreams.Send(req)
if err != nil
s.RemoveClientFromBroadcastList(s.clientStreams[i])
return nil
【讨论】:
以上是关于如何解决 go 和 grpc 中的 pub-sub 问题?的主要内容,如果未能解决你的问题,请参考以下文章