扩展 gRPC 双向流式聊天服务

Posted

技术标签:

【中文标题】扩展 gRPC 双向流式聊天服务【英文标题】:Scaling gRPC bidirectional streaming chat service 【发布时间】:2021-01-27 06:06:36 【问题描述】:

我正在用 gRPC java 起草一个双向流的聊天服务。简化流程如下,

    当用户加入聊天服务时,用户的StreamObserver将存储在聊天室存储库中,即服务器中保存userId - StreamObserver的简单HashMap。 一段时间后,当用户发送聊天消息时,服务器收到请求并通过迭代存储在聊天室存储库中的StreamObservers 并调用onNext 方法将消息广播给聊天室中的所有用户。

当只有 1 台服务器存在时,这可以正常工作,但是一旦扩展到多台服务器,客户端的 StreamObservers 将存储在特定服务器中,并且不会存在于其他服务器中,因为 gRPC 会打开单个 HTTP 连接到服务器最初连接。

我需要通过将所有StreamObservers 分散在服务器周围来将消息发送给同一聊天室中的所有用户,有没有人对这种情况有好的经验?我尝试将StreamObserver 存储在单个存储中,但是由于它不可序列化,我无法将其存储在像 redis 这样的共享存储中。

【问题讨论】:

您必须让客户端连接到所有服务器实例,方法是使用NameResolver 监听所有实例,然后使用负载平衡策略向服务器发送消息。这是一个例子sultanov.dev/blog/grpc-client-side-load-balancing。你已经这样做了吗? 你能发布客户端的代码来监听来自所有 gRPC 服务器实现的消息吗? @Felipe 谢谢!从来没有想过客户端负载平衡。将看看并尝试。看来我也需要看看this page。客户端代码尽可能简单。它在双向流方法中定义了一个 StreamObserver。我先看看客户端的LB,如果有其他问题也会贴出客户端代码。再次感谢! 我按照链接所述实现了代码。它确实负载平衡。但是聊天不起作用。我正在研究它(我喜欢这个问题=))。我会把答案发给你。 【参考方案1】:

我使用 gRPC 和 3 个负载平衡服务器实现了聊天。实现负载平衡的第一件事是使用ManagedChanneldefaultLoadBalancingPolicy。就我而言,我使用了round_robin 策略。并使用带有三个服务器的主机和端口的MultiAddressNameResolverFactory 创建通道。在这里,我为 Alice 创建了一个客户端聊天。然后你复制这个类并为 Bob 创建一个客户端聊天。这应该已经完成​​了您要求的负载平衡。

public class ChatClientAlice 
    private NameResolver.Factory nameResolverFactory;
    private ManagedChannel channel;

    public static void main(String[] args) 
        ChatClientAlice chatClientAlice = new ChatClientAlice();
        chatClientAlice.createChannel();
        chatClientAlice.runBiDiStreamChat();
        chatClientAlice.closeChannel();
    

    private void createChannel() 
        nameResolverFactory = new MultiAddressNameResolverFactory(
                new InetSocketAddress("localhost", 50000),
                new InetSocketAddress("localhost", 50001),
                new InetSocketAddress("localhost", 50002)
        );
        channel = ManagedChannelBuilder.forTarget("service")
                .nameResolverFactory(nameResolverFactory)
                .defaultLoadBalancingPolicy("round_robin")
                .usePlaintext()
                .build();
    

    private void closeChannel()  channel.shutdown(); 

    private void runBiDiStreamChat() 
        System.out.println("creating Bidirectional stream stub for Alice");
        EchoServiceGrpc.EchoServiceStub asyncClient = EchoServiceGrpc.newStub(channel);
        CountDownLatch latch = new CountDownLatch(1);

        StreamObserver<EchoRequest> requestObserver = asyncClient.echoBiDi(new StreamObserver<EchoResponse>() 
            @Override
            public void onNext(EchoResponse value)  System.out.println("chat: " + value.getMessage()); 

            @Override
            public void onError(Throwable t)  latch.countDown(); 

            @Override
            public void onCompleted()  latch.countDown(); 
        );

        Stream.iterate(0, i -> i + 1)
                .limit(10)
                .forEach(integer -> 
                    String msg = "Hello, I am " + ChatClientAlice.class.getSimpleName() + "! I am sending stream message number " + integer + ".";
                    System.out.println("Alice says: " + msg);
                    EchoRequest request = EchoRequest.newBuilder()
                            .setMessage(msg)
                            .build();
                    requestObserver.onNext(request);
                    // throttle the stream
                    try  Thread.sleep(5000);  catch (InterruptedException e)  
                );
        requestObserver.onCompleted();
        System.out.println("Alice BiDi stream is done.");
        try 
            // wait for the time set on the stream + the throttle
            latch.await((5000 * 20), TimeUnit.MILLISECONDS);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

在服务器服务上,每次收到来自新客户端的新请求时,您都必须使用单例来存储 StreamObservers。不是将消息返回给单个观察者responseObserver.onNext(response);,而是迭代所有观察者并将消息发送给所有singletonObservers.getObservers().forEach(....。尽管这与负载平衡策略无关,但我认为值得发布,因为如果您没有很好地实现它,您的客户端将不会收到来自其他客户端的消息。

public class ChatServiceImpl extends EchoServiceGrpc.EchoServiceImplBase 

    private final String name;
    private final SingletlonChatStreamObserver singletonObservers;

    ChatServiceImpl(String name) 
        this.name = name;
        this.singletonObservers = SingletlonChatStreamObserver.getInstance();
    

    @Override
    public StreamObserver<EchoRequest> echoBiDi(StreamObserver<EchoResponse> responseObserver) 
        System.out.println("received bidirectional call");

        singletonObservers.addObserver(responseObserver);
        System.out.println("added responseObserver to the pool of observers: " + singletonObservers.getObservers().size());

        StreamObserver<EchoRequest> requestObserver = new StreamObserver<EchoRequest>() 
            @Override
            public void onNext(EchoRequest value) 
                String msg = value.getMessage();
                System.out.println("received message: " + msg);
                EchoResponse response = EchoResponse.newBuilder()
                        .setMessage(msg)
                        .build();
                // do not send messages to a single observer but to all observers on the pool
                // responseObserver.onNext(response);
                // observers.foreach...
                singletonObservers.getObservers().forEach(observer -> 
                    observer.onNext(response);
                );
            

            @Override
            public void onError(Throwable t) 
                // observers.remove(responseObserver);
                singletonObservers.getObservers().remove(responseObserver);
                System.out.println("removed responseObserver to the pool of observers");
            

            @Override
            public void onCompleted() 
                // do not complete messages to a single observer but to all observers on the pool
                // responseObserver.onCompleted();
                // observers.foreach
                singletonObservers.getObservers().forEach(observer -> 
                    observer.onCompleted();
                );

                // observers.remove(responseObserver);
                System.out.println("removed responseObserver to the pool of observers");
            
        ;
        return requestObserver;
    

这是我的SingletlonChatStreamObserver 为所有 3 台服务器只有一个对象:

public class SingletlonChatStreamObserver implements Serializable 

    private static volatile SingletlonChatStreamObserver singletonSoleInstance;
    private static volatile ArrayList<StreamObserver<EchoResponse>> observers;

    private SingletlonChatStreamObserver() 
        //Prevent form the reflection api.
        if (singletonSoleInstance != null) 
            throw new RuntimeException("Use getInstance() method to get the single instance of this class.");
        
    

    public static SingletlonChatStreamObserver getInstance() 
        if (singletonSoleInstance == null)  //if there is no instance available... create new one
            synchronized (SingletlonChatStreamObserver.class) 
                if (singletonSoleInstance == null) 
                    observers = new ArrayList<StreamObserver<EchoResponse>>();
                    singletonSoleInstance = new SingletlonChatStreamObserver();
                
            
        
        return singletonSoleInstance;
    

    //Make singleton from serializing and deserialize operation.
    protected SingletlonChatStreamObserver readResolve() 
        return getInstance();
    

    public void addObserver(StreamObserver<EchoResponse> streamObserver) 
        observers.add(streamObserver);
    

    public ArrayList<StreamObserver<EchoResponse>> getObservers() 
        return observers;
    

我将在我的explore-grpc 项目中提交完整的代码。

【讨论】:

以上是关于扩展 gRPC 双向流式聊天服务的主要内容,如果未能解决你的问题,请参考以下文章

第八节——实现双向流式GRPC

第八节——实现双向流式GRPC

基于grpc的流式方式实现双向通讯(python)

gRPC 流式传输极简入门指南

gRPC 流式传输极简入门指南

gRPC 流式传输极简入门指南