如何在 gRPC 中从服务器到客户端进行广播?

Posted

技术标签:

【中文标题】如何在 gRPC 中从服务器到客户端进行广播?【英文标题】:How to broadcast in gRPC from server to client? 【发布时间】:2018-09-09 20:54:12 【问题描述】:

我现在正在 gRPC 中创建一个小型聊天应用程序,我遇到了一个问题,如果用户想作为客户端连接到 gRPC 服务器,我想广播该事件已发生所有其他连接的客户端。

我正在考虑使用某种观察者,但我对服务器如何知道谁连接以及我如何将事件广播给所有客户端而不仅仅是一两个客户端感到困惑。

我知道使用流是答案的一部分,但是因为每个客户端都在使用服务器创建自己的流,所以我不确定它如何订阅其他服务器-客户端流。

【问题讨论】:

保留已连接客户端的列表。 ***.com/questions/41583218/… 表示客户没有实际的唯一标识符 正如您刚刚链接的问题的答案所述,“一种可能的解决方案是应用程序级别的握手协议。您可以添加 rpc 方法 'Connect' 并发送 clientId 作为来自服务器的响应。之后你可以将自定义标头(元数据)附加到您的 rpc 调用。" 嗯...对不起,我错过了。我在想也许使用 gRPC 支持的双向流将是答案,但我想像客户端列表之类的东西会起作用(尽管每次向服务器发出请求时都必须循环遍历所有客户端感觉很奇怪) 如果您想向所有客户广播一条消息,您将不得不循环浏览它们 - 我看不出有任何可以避免的方式。 【参考方案1】:

另一种选择是使用长轮询方法。那就是尝试类似下面的东西(Python中的代码,因为这是我最熟悉的,但go应该非常相似)。这未经测试,旨在让您了解如何在 gRPC 中进行长轮询:

.PROTO defs
-------------------------------------------------
service Updater 
    rpc GetUpdates(GetUpdatesRequest) returns (GetUpdatesResponse);


message GetUpdatesRequest 
    int64 last_received_update = 1;


message GetUpdatesResponse 
    repeated Update updates = 1;
    int64 update_index = 2;


message Update 
    // your update structure



SERVER
-----------------------------------------------------------
class UpdaterServer(UpdaterServicer):
    def __init__(self):
        self.condition = threading.Condition()
        self.updates = []

    def post_update(self, update):
        """
        Used whenever the clients should be updated about something. It will
        trigger their long-poll calls to return
        """
        with self.condition:
            # TODO: You should probably remove old updates after some time
            self.updates.append(updates)
            self.condition.notify_all()

    def GetUpdates(self, req, context):
        with self.condition:
            while self.updates[req.last_received_update + 1:] == []:
                self.condition.wait()
            new_updates = self.updates[req.last_received_update + 1:]
            response = GetUpdatesResponse()
            for update in new_updates:
                response.updates.add().CopyFrom(update)
            response.update_index = req.last_received_update + len(new_updates)
            return response


SEPARATE THREAD IN THE CLIENT
----------------------------------------------
request = GetUpdatesRequest()
request.last_received_update = -1
while True:
    stub = UpdaterStub(channel)
    try:
        response = stub.GetUpdates(request, timeout=60*10)
        handle_updates(response.updates)
        request.last_received_update = response.update_index
    except grpc.FutureTimeoutError:
        pass

【讨论】:

【参考方案2】:

是的,除了保持一个包含所有连接流并循环遍历它们的全局数据结构,我没有看到任何其他方法,告诉每个流刚刚发生的事件。

【讨论】:

如何获取已连接的流,以便将其放入列表?【参考方案3】:

另一种方法是在客户端也生成一个 grpc-server。在应用程序级别,您需要从客户端到服务器进行一些握手,以交换客户端 grpc-server ip 和端口。此时您可能希望为该地址创建一个客户端并将该客户端存储在一个列表中。

现在您可以使用默认的一元 RPC 调用从列表中向客户端推送消息。不需要 [bidi] 流。 优点:

可以将客户端“Push”-API 与服务器 API 分开。 一元 RPC 推送调用。

缺点:

附加“服务器”。不知道这是否适用于所有场景。

【讨论】:

【参考方案4】:

需要一个全局的map 结构,您可以为每个连接创建一个新的chan。我想出的是一个处理全局map结构的中间通道。

服务器流式传输示例:

func (s *server) Subscribe(req *pb.SubscribeRequest, srv pb.SubscribeServer) error 
    //get trace id or generated a random string or whatever you want to indicate this goroutine
    ID:="randomString"
    //create a chan to receive response message
    conn := make(chan *pb.SubscribeResponse)
    //an intermediate channel which has the ownership of the `map`
    s.broadcast <- &broadcastPayload 
        //an unique identifier
        ID: ID
        //the chan corresponse to the ID
        Conn: conn
        //event to indicate add, remove or send message to broadcast channel
        Event: EventEnum.AddConnection
    
    
    for 
        select   
            case <-srv.Context().Done():  
                s.broadcast <- &entity.BroadcastPayload  
                     ID: ID,
                     Event: EventEnum.RemoveConnection
                
                return nil  
            case response := <-conn:  
                if status, ok := status.FromError(srv.Send(response)); ok   
                    switch status.Code()   
                    case codes.OK:  
                        //noop  
                    case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:  
                        return nil  
                    default:  
                        return nil  
               
         
    

对于broadcast go 例程:

//this goroutine has the ownership of the map[string]chan *pb.SubscribeResponse
go func()
    for v:=range s.broadcast 
        //do something based on the event
        switch v.Event 
            //add the ID and conn to the map
            case EventEnum.AddConnection:
                ...
            //delete map key and close conn channel here
            case EventEnum.RemoveConnection:
                ...
            //receive message from business logic, send the message to suiteable conn in the map as you like
            case EventEnum.ReceiveResponse:
                ...
        
    

我放了一些细节here

【讨论】:

以上是关于如何在 gRPC 中从服务器到客户端进行广播?的主要内容,如果未能解决你的问题,请参考以下文章

如何使gRPC 获得最佳性能?

gRPC系列:什么是gRPC API,它如何工作?

[go微服务-17] gRPC和 Apache Thrift 之间 如何进行选型?

gRPC 客户端流是如何实现的

如何配置 grpc 客户端的源 IP 地址和/或端口?

如何在 Google Cloud Platform 上部署 GRPC 服务器和客户端?