扩展 gRPC 双向流式聊天服务
Posted
技术标签:
【中文标题】扩展 gRPC 双向流式聊天服务【英文标题】:Scaling gRPC bidirectional streaming chat service 【发布时间】:2021-01-27 06:06:36 【问题描述】:我正在用 gRPC java 起草一个双向流的聊天服务。简化流程如下,
-
当用户加入聊天服务时,用户的
StreamObserver
将存储在聊天室存储库中,即服务器中保存userId - StreamObserver
的简单HashMap
。
一段时间后,当用户发送聊天消息时,服务器收到请求并通过迭代存储在聊天室存储库中的StreamObserver
s 并调用onNext
方法将消息广播给聊天室中的所有用户。
当只有 1 台服务器存在时,这可以正常工作,但是一旦扩展到多台服务器,客户端的 StreamObserver
s 将存储在特定服务器中,并且不会存在于其他服务器中,因为 gRPC 会打开单个 HTTP 连接到服务器最初连接。
我需要通过将所有StreamObserver
s 分散在服务器周围来将消息发送给同一聊天室中的所有用户,有没有人对这种情况有好的经验?我尝试将StreamObserver
存储在单个存储中,但是由于它不可序列化,我无法将其存储在像 redis 这样的共享存储中。
【问题讨论】:
您必须让客户端连接到所有服务器实例,方法是使用NameResolver
监听所有实例,然后使用负载平衡策略向服务器发送消息。这是一个例子sultanov.dev/blog/grpc-client-side-load-balancing。你已经这样做了吗?
你能发布客户端的代码来监听来自所有 gRPC 服务器实现的消息吗?
@Felipe 谢谢!从来没有想过客户端负载平衡。将看看并尝试。看来我也需要看看this page。客户端代码尽可能简单。它在双向流方法中定义了一个 StreamObserver。我先看看客户端的LB,如果有其他问题也会贴出客户端代码。再次感谢!
我按照链接所述实现了代码。它确实负载平衡。但是聊天不起作用。我正在研究它(我喜欢这个问题=))。我会把答案发给你。
【参考方案1】:
我使用 gRPC 和 3 个负载平衡服务器实现了聊天。实现负载平衡的第一件事是使用ManagedChannel
和defaultLoadBalancingPolicy
。就我而言,我使用了round_robin
策略。并使用带有三个服务器的主机和端口的MultiAddressNameResolverFactory
创建通道。在这里,我为 Alice 创建了一个客户端聊天。然后你复制这个类并为 Bob 创建一个客户端聊天。这应该已经完成了您要求的负载平衡。
public class ChatClientAlice
private NameResolver.Factory nameResolverFactory;
private ManagedChannel channel;
public static void main(String[] args)
ChatClientAlice chatClientAlice = new ChatClientAlice();
chatClientAlice.createChannel();
chatClientAlice.runBiDiStreamChat();
chatClientAlice.closeChannel();
private void createChannel()
nameResolverFactory = new MultiAddressNameResolverFactory(
new InetSocketAddress("localhost", 50000),
new InetSocketAddress("localhost", 50001),
new InetSocketAddress("localhost", 50002)
);
channel = ManagedChannelBuilder.forTarget("service")
.nameResolverFactory(nameResolverFactory)
.defaultLoadBalancingPolicy("round_robin")
.usePlaintext()
.build();
private void closeChannel() channel.shutdown();
private void runBiDiStreamChat()
System.out.println("creating Bidirectional stream stub for Alice");
EchoServiceGrpc.EchoServiceStub asyncClient = EchoServiceGrpc.newStub(channel);
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<EchoRequest> requestObserver = asyncClient.echoBiDi(new StreamObserver<EchoResponse>()
@Override
public void onNext(EchoResponse value) System.out.println("chat: " + value.getMessage());
@Override
public void onError(Throwable t) latch.countDown();
@Override
public void onCompleted() latch.countDown();
);
Stream.iterate(0, i -> i + 1)
.limit(10)
.forEach(integer ->
String msg = "Hello, I am " + ChatClientAlice.class.getSimpleName() + "! I am sending stream message number " + integer + ".";
System.out.println("Alice says: " + msg);
EchoRequest request = EchoRequest.newBuilder()
.setMessage(msg)
.build();
requestObserver.onNext(request);
// throttle the stream
try Thread.sleep(5000); catch (InterruptedException e)
);
requestObserver.onCompleted();
System.out.println("Alice BiDi stream is done.");
try
// wait for the time set on the stream + the throttle
latch.await((5000 * 20), TimeUnit.MILLISECONDS);
catch (InterruptedException e)
e.printStackTrace();
在服务器服务上,每次收到来自新客户端的新请求时,您都必须使用单例来存储 StreamObserver
s。不是将消息返回给单个观察者responseObserver.onNext(response);
,而是迭代所有观察者并将消息发送给所有singletonObservers.getObservers().forEach(....
。尽管这与负载平衡策略无关,但我认为值得发布,因为如果您没有很好地实现它,您的客户端将不会收到来自其他客户端的消息。
public class ChatServiceImpl extends EchoServiceGrpc.EchoServiceImplBase
private final String name;
private final SingletlonChatStreamObserver singletonObservers;
ChatServiceImpl(String name)
this.name = name;
this.singletonObservers = SingletlonChatStreamObserver.getInstance();
@Override
public StreamObserver<EchoRequest> echoBiDi(StreamObserver<EchoResponse> responseObserver)
System.out.println("received bidirectional call");
singletonObservers.addObserver(responseObserver);
System.out.println("added responseObserver to the pool of observers: " + singletonObservers.getObservers().size());
StreamObserver<EchoRequest> requestObserver = new StreamObserver<EchoRequest>()
@Override
public void onNext(EchoRequest value)
String msg = value.getMessage();
System.out.println("received message: " + msg);
EchoResponse response = EchoResponse.newBuilder()
.setMessage(msg)
.build();
// do not send messages to a single observer but to all observers on the pool
// responseObserver.onNext(response);
// observers.foreach...
singletonObservers.getObservers().forEach(observer ->
observer.onNext(response);
);
@Override
public void onError(Throwable t)
// observers.remove(responseObserver);
singletonObservers.getObservers().remove(responseObserver);
System.out.println("removed responseObserver to the pool of observers");
@Override
public void onCompleted()
// do not complete messages to a single observer but to all observers on the pool
// responseObserver.onCompleted();
// observers.foreach
singletonObservers.getObservers().forEach(observer ->
observer.onCompleted();
);
// observers.remove(responseObserver);
System.out.println("removed responseObserver to the pool of observers");
;
return requestObserver;
这是我的SingletlonChatStreamObserver
为所有 3 台服务器只有一个对象:
public class SingletlonChatStreamObserver implements Serializable
private static volatile SingletlonChatStreamObserver singletonSoleInstance;
private static volatile ArrayList<StreamObserver<EchoResponse>> observers;
private SingletlonChatStreamObserver()
//Prevent form the reflection api.
if (singletonSoleInstance != null)
throw new RuntimeException("Use getInstance() method to get the single instance of this class.");
public static SingletlonChatStreamObserver getInstance()
if (singletonSoleInstance == null) //if there is no instance available... create new one
synchronized (SingletlonChatStreamObserver.class)
if (singletonSoleInstance == null)
observers = new ArrayList<StreamObserver<EchoResponse>>();
singletonSoleInstance = new SingletlonChatStreamObserver();
return singletonSoleInstance;
//Make singleton from serializing and deserialize operation.
protected SingletlonChatStreamObserver readResolve()
return getInstance();
public void addObserver(StreamObserver<EchoResponse> streamObserver)
observers.add(streamObserver);
public ArrayList<StreamObserver<EchoResponse>> getObservers()
return observers;
我将在我的explore-grpc 项目中提交完整的代码。
【讨论】:
以上是关于扩展 gRPC 双向流式聊天服务的主要内容,如果未能解决你的问题,请参考以下文章