如何解决 go 和 grpc 中的 pub-sub 问题?

Posted

技术标签:

【中文标题】如何解决 go 和 grpc 中的 pub-sub 问题?【英文标题】:How to solve pub-sub problem in go and grpc? 【发布时间】:2021-12-01 04:38:49 【问题描述】:

在 go grpc 服务中,我有一个接收者(发布者)事件循环,发布者可以检测到它希望发送者停止。但是通道原则说我们不应该在接收方关闭通道,只在发送方关闭。它应该如何受到威胁?

情况如下。想象一下聊天。第一个客户端 - 订阅者 - 接收消息,由于 grpc 限制,没有 goroutine 无法完成其流式传输。第二个客户端 - 发布者正在向聊天发送消息,所以它是另一个 goroutine。您必须将消息从发布者传递给订阅者接收客户端,前提是订阅者未关闭其连接(强制从接收者端关闭通道)

代码中的问题:

//1st client goroutine - subscriber
func (s *GRPCServer) WatchMessageServer(req *WatchMessageRequest, stream ExampleService_WatchMessageServer) error 
    ch := s.NewClientChannel()
    // natively blocks goroutine with send to its stream, until send gets an error
    for 
        msg, ok := <-ch
        if !ok 
            return nil
        
        err := stream.Send(msg) // if this fails - we need to close ch from receiver side to "propagate" closing signal
        if err != nil 
            return err
        
    


//2nd client goroutine - publisher
func (s *GRPCServer) SendMessage(ctx context.Context, req *SendMessageRequest) (*emptypb.Empty, error) 
    for i := range s.clientChannels 
        s.clientChannels[i] <- req
        // no way other than panic, to know when to remove channel from list. or need to make a wrapper with recover..
    
    return nil

【问题讨论】:

关闭通道上的发送操作出现恐慌,期间。您要么必须在发送者和接收者之间进行协调,要么设计一个不同的解决方案。如果没有一个实际的例子,你应该如何做到这一点很难说。 @JimB 也许解决方案只是处理恐慌?我在 PS2 部分提供了示例。 不要通过恐慌恢复“解决”这个问题 - 遵循 @JimB 的建议并考虑重新设计。 您可能需要等待一对频道。一种供服务器传输,另一种供客户端发出其连接已关闭的信号。 gRPC clientserver 可以(并且可能)在完全不同的网络/硬件/内存空间上运行。 Go 渠道沟通在这里毫无意义。如果客户端想要中止流 - 关闭流。服务器将通过连接context.Context 看到这一点 - 因此,当检测到此事件时停止服务。 【参考方案1】:

我最初通过搜索得到了一个线索,感谢这个答案,answer here 中提供了解决方案。

提供流解决方案示例代码,我猜它是通用发布-订阅问题的实现:

//1st client goroutine - subscriber
func (s *GRPCServer) WatchMessageServer(req *WatchMessageRequest, stream ExampleService_WatchMessageServer) error 
    s.AddClientToBroadcastList(stream)
    select 
    case <-stream.Context().Done(): // *** promised that it would signal when client closes stream
        return stream.Context().Err() // stream will be closed immediately after return
    case <-s.ctx.Done(): // program shutdown
        return s.ctx.Err()
    


//2nd client goroutine - publisher
func (s *GRPCServer) SendMessage(ctx context.Context, req *SendMessageRequest) (*emptypb.Empty, error) 
    for i := range s.clientStreams 
        err := s.clientStreams.Send(req)
        if err != nil 
            s.RemoveClientFromBroadcastList(s.clientStreams[i])
        
    
    return nil

【讨论】:

以上是关于如何解决 go 和 grpc 中的 pub-sub 问题?的主要内容,如果未能解决你的问题,请参考以下文章

grpc/go 如何在 grpc.Dial 中设置 grpc.ssl_target_name_override

如何调试 gRPC-Go 服务?

gRPC-go源码:连接管理

gRPC-go源码:连接管理

Go 中的 gRPC 入门详解

PHP使用gRPC请求GO服务实战