深入浅出掌握grpc通信框架

Posted 有山先生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入浅出掌握grpc通信框架相关的知识,希望对你有一定的参考价值。

1. 背景

Alluxio底层使用到了Grpc作为底层通讯框架,为了弄清楚Alluxio服务端的线程模型,必然需要对Grpc代码有所掌握。本文先介绍Grpc底层HTTP2的知识,通过一个GRPC项目,深入GRPC源码,探索GRPC的线程模型。

2. Grpc简介

GRPC是Google 推出的RPC框架。并且支持多种语言。GRPC的几种模式:
单向RPC

客户端发出单个请求,获得单个响应。客户端调用服务端的某个方法。客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。如下:

rpc SayHello (HelloRequest) returns (HelloReply) 

服务端流式 RPC

客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。个人认为当有客户端需要主动从服务端读取数据的时候可以用。如下:

rpc RecordRoute(stream Point) returns (RouteSummary) 

客户端流式 RPC

客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。个人认为应该是客户端需要把数据发送给服务端的时候使用。如下:

rpc ListFeatures(Rectangle) returns (stream Feature) 

双向流式 RPC

是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。如下:

rpc RouteChat(stream RouteNote) returns (stream RouteNote) 

3. HTTP多路复用模型发展历程

当客户端想要发送100次HTTP请求时,默认情况就是发送第一个HTTP请求,收到响应后,才能发送下一个HTTP请求,这个效率非常低。在HTTP协议发展过程中,针对这个特性逐步进行了优化,逐步发展出HTTP多路复用模型。

3.1 HTTP1.0

对于HTTP1.0协议,客户端连续发送100个HTTP请求要经历下面的流程:

  1. 每发送一个HTTP请求,就需要建立一个TCP连接,经历三次握手。
  2. 每个TCP连接都要经历拥塞控制,通过慢启动探测网络的拥塞情况,TCP的滑动窗口才从0上升到最大值(滑动窗口控制TCP数据报的传输的并发度)。
  3. HTTP是无状态协议,必须等前面的HTTP的响应处理,才能发下一个请求。如果HTTP请求乱序发送,无法确定HTTP响应是针对哪个请求的,因此HTTP是无状态协议。

上面HTTP1.0协议中,第1、2条流程可以知道HTTP性能非常差,TCP连接+拥塞控制慢启动,增加了HTTP请求的额外耗时。第3条流程使得HTTP请求无法并发。可以针对耗时和并发这两点来优化HTTP协议。

3.2 HTTP1.1

HTTP1.1针对HTTP的缺点,可以通过在发送http的请求头中设置Connection: keep-alive进行连接复用,将放到同一个socket服务端的请求放到一个TCP连接中,避免多次连接。使用连接复用时,还是要经历请求->响应->请求->响应...的过程。对于多个HTTP请求,它们依然是串行的:

为了并行发送HTTP请求,即一次性把所有HTTP请求发送出去,最后按照发送顺序接收响应,HTTP1.1提出了pipeline管线化技术。如下图所示,客户端一次性发送所有HTTP请求,服务端依次处理所有请求并依次返回响应:

pipeline技术有一个致命缺陷,就是线头阻塞(Head-of-line blocking),即服务端一旦处理某个HTTP较慢,后面的HTTP请求均要进行阻塞等待,因此性能不好。由于这个缺陷,HTTP1.1管线化技术并未普及。

3.3 HTTP2

HTTP2对HTTP1.1中的管线化技术进行改进,将所有的HTTP请求分批发送到服务端,每个批次有一个StreamID,即流ID。每一个流中的HTTP请求要依次处理,不用的流中的HTTP请求可以并行处理。这样,一个流中的HTTP请求即使发生阻塞,也不会影响其他流中的HTTP请求:

如下所示,所有流共享一条TCP连接,不同流HTTP请求并发发送,同一个流中的HTTP请求依次发送:

HTTP2优点如下:

  1. 一个客户端与一个服务端端口连接时,即使发送多个请求,只会使用一个TCP连接。
  2. Request通过streamId并行发送请求/响应,实现并发HTTP发送的效果。

4.项目代码

项目定义一个proto文件,编译成为Java类和Grpc代码。通过grpc代码发送HTTP请求。本文通过debug这个项目,逐步深入了解GRPC通信框架。

4.1 maven依赖

 <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <grpc.version>1.6.1</grpc.version>
    <protobuf.version>3.3.0</protobuf.version>
 </properties>

 <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.52.Final</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.9.0</version>
        </dependency>

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>$grpc.version</version>
        </dependency>

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>$grpc.version</version>
        </dependency>

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>$grpc.version</version>
        </dependency>
    </dependencies>

    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.5.0.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:$os.detected.classifier</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <protoSourceRoot>$project.basedir/src/main/java/proto</protoSourceRoot>
                    <outputDirectory>$project.basedir/src/main/java/</outputDirectory>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:$grpc.version:exe:$os.detected.classifier</pluginArtifact>
                    <clearOutputDirectory>false</clearOutputDirectory>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

4.2 proto定义

syntax = "proto3";
option java_outer_classname = "Hello";
package protobuf;
option java_generic_services = true;
service HelloService 
  rpc SayHello (HelloRequest) returns (HelloResponse);
  rpc SayHi (stream HelloRequest) returns (HelloResponse);
  rpc SayGood (HelloRequest) returns (stream HelloResponse);
  rpc SayBad (stream HelloRequest) returns (stream HelloResponse);
  rpc SayOK (stream HelloRequest) returns (stream HelloResponse);

message HelloRequest 
  string greeting = 1;

message HelloResponse 
  string reply = 1;

生成两个类:

1.Hello类就是序列化类,包含请求类HelloRequest和响应类HelloResponse。
2.HelloServiceGrpc则是代理类,客户端调用这个类远程调用服务端的方法。

序列化类如下所示:

代理类如下所示:

  1. HelloServiceBlockingStub表示阻塞式调用,收到响应前一直阻塞。
  2. HelloServiceStub表示异步调用,传入回调方法,请求完后不阻塞等待响应,异步线程收到响应时,直接调用回调方法处理。

4.3 服务端代码

服务端启动方法监听19999端口,并指定StreamHelloServiceImpl类为服务端处理请求的逻辑:

public class StreamServer 
    public static void main(String[] args) throws IOException, InterruptedException 
        io.grpc.Server server = ServerBuilder.forPort(19999).addService(new StreamHelloServiceImpl()).build().start();
        System.out.println("start server");
        server.awaitTermination();
    

服务端处理逻辑分为两个方法:

  1. 当客户端远程调用sayHello方法时,服务端调用sayHello处理客户端请求。当客户端发出一次请求时,服务端响应三次。
  2. 当客户端远程调用sayOK方法时,服务端调用sayOK处理客户端请求。当客户端发出一次请求时,服务端响应三次。
public class StreamHelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase 
    @Override
    public StreamObserver<Hello.HelloRequest> sayBad(StreamObserver<Hello.HelloResponse> responseObserver) 
        return new StreamObserver<Hello.HelloRequest>() 
            @Override
            public void onNext(Hello.HelloRequest value) 
                System.out.println("receive : " + value.getGreeting());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("bad1: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("bad2: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("bad3: " + value.getGreeting()).build());
            

            @Override
            public void onError(Throwable t) 
                System.out.println("error");
                System.out.println(t.getMessage());
            

            @Override
            public void onCompleted() 
                System.out.println("completed");
                responseObserver.onCompleted();
            
        ;
    

    @Override
    public StreamObserver<Hello.HelloRequest> sayOK(StreamObserver<Hello.HelloResponse> responseObserver) 
        return new StreamObserver<Hello.HelloRequest>() 
            @Override
            public void onNext(Hello.HelloRequest value) 
                System.out.println("receive : " + value.getGreeting());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok1: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok2: " + value.getGreeting()).build());
                responseObserver.onNext(Hello.HelloResponse.newBuilder().setReply("ok3: " + value.getGreeting()).build());
            

            @Override
            public void onError(Throwable t) 
                System.out.println("error");
                System.out.println(t.getMessage());
            

            @Override
            public void onCompleted() 
                System.out.println("completed");
                responseObserver.onCompleted();
            
        ;
    

4.4 客户端代码

客户端指定一个回调对象streamObserver,用于打印服务端的响应。在main方法中向服务端发出sayBad和sayOK两种rpc请求,每种请求发送两条消息:

public class StreamClient 

    static StreamObserver<Hello.HelloResponse> streamObserver = new StreamObserver<Hello.HelloResponse>()

        @Override
        public void onNext(Hello.HelloResponse value) 
           System.out.println(value.getReply());
        

        @Override
        public void onError(Throwable t) 
            System.out.println(t.getMessage());
        

        @Override
        public void onCompleted() 
            System.out.println("completed");
        
    ;

    public static void main(String[] args) throws InterruptedException 
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 19999).usePlaintext(true).build();
        HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(channel);
        StreamObserver<Hello.HelloRequest> helloRequestStreamObserver = helloServiceStub.sayBad(streamObserver);
        helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: im sad").build());
        helloRequestStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("hello: im happy").build());
        StreamObserver<Hello.HelloRequest> helloOKStreamObserver = helloServiceStub.sayOK(streamObserver);
        helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: im sad").build());
        helloOKStreamObserver.onNext(Hello.HelloRequest.newBuilder().setGreeting("ok: im happy").build());
        Thread.sleep(1000);
        //channel.shutdown();
    

4.4 运行结果

服务端收到客户端请求:

客户端收到服务端响应:

5. 客户端执行流程

5.1 创建ManagedChannelImpl对象

创建ManagedChannel实现类:

ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9999).usePlaintext(true).build();

调用forAddress方法,输入目的端的ip地址和端口:

public static ManagedChannelBuilder<?> forAddress(String name, int port) 
  return ManagedChannelProvider.provider().builderForAddress(name, port);

ManagedChannelProvider.provider()方法负责找到提供ManagedChannel实现的类:

public static ManagedChannelProvider provider() 
  if (provider == null) 
    throw new ProviderNotFoundException("No functional channel service provider found. "
        + "Try adding a dependency on the grpc-okhttp or grpc-netty artifact");
  
  return provider;

如下,默认通过SPI加载ManagedChannelProvider实现类:

private static final ManagedChannelProvider provider
    = load(ManagedChannelProvider.class.getClassLoader());

load方法最终通过SPI加载实现类:

public static Iterable<ManagedChannelProvider> getCandidatesViaServiceLoader(
    ClassLoader classLoader) 
  return ServiceLoader.load(ManagedChannelProvider.class, classLoader);

发现grpc-netty包实现ManagedChannelProvider接口:

其实现类是:io.grpc.netty.NettyChannelProvider
后续通过NettyChannelProvider对象创建NettyChannelBuilder对象,NettyChannelBuilder负责构建NettyChannel。如下记录目的地址和端口:

public NettyChannelBuilder builderForAddress(String name, int port) 
  return NettyChannelBuilder.forAddress(name, port);

通过debug发现,builderForAddress方法给NettyChannelBuilder的祖父类初始化字符串成员变量:

设置使用明文传输,数据不需要加密:

public NettyChannelBuilder usePlaintext(boolean skipNegotiation) 
  if (skipNegotiation) 
    negotiationType(NegotiationType.PLAINTEXT);
   else 
    negotiationType(NegotiationType.PLAINTEXT_UPGRADE);
  
  return this;

通过debug发现,usePlaintext方法最终给NettyChannelBuilder初始化enum类型测成员变量:

最终通过NettyChannelBuilder#build()方法生成ManagedChannelImpl对象,NettyChannelBuilder作为其形式参数,它记录了地址和port等信息:

public ManagedChannel build() 
  return new ManagedChannelImpl(
      this,
      buildTransportFactory(),
      // TODO(carl-mastrangelo): Allow clients to pass this in
      new ExponentialBackoffPolicy.Provider(),
      SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
      GrpcUtil.STOPWATCH_SUPPLIER,
      getEffectiveInterceptors());

NettyChannelBuilder返回了一个ManagedChannelImpl对象,其构造参数较多,如下所示:

ManagedChannelImpl(
    AbstractManagedChannelImplBuilder<?> builder,
    ClientTransportFactory clientTransportFactory,
    BackoffPolicy.Provider backoffPolicyProvider,
    ObjectPool<? extends Executor> oobExecutorPool,
    Supplier<Stopwatch> stopwatchSupplier,
    List<ClientInterceptor> interceptors) 
  this.target = checkNotNull(builder.target, "target");
  this.nameResolverFactory = builder.getNameResolverFactory();
  this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
  this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
  this.loadBalancerFactory =
      checkNotNull(builder.loadBalancerFactory, "loadBalancerFactory");
  this.executorPool = checkNotNull(builder.executorPool, "executorPool");
  this.oobExecutorPool = checkNotNull(oobExecutorPool, "oobExecutorPool");
  this.executor = checkNotNull(executorPool.getObject(), "executor");
  this.delayedTransport = new DelayedClientTransport(this.executor, this.channelExecutor);
  this.delayedTransport.start(delayedTransportListener);
  this.backoffPolicyProvider = backoffPolicyProvider;
  this.transportFactory =
      new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
  this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
  this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
  if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) 
    this.idleTimeoutMillis = builder.idleTimeoutMillis;
   else 
    checkArgument(
        builder.idleTimeoutMillis
            >= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
        "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
    this.idleTimeoutMillis = builder.idleTimeoutMillis;
  
  this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
  this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
  this.userAgent = builder.userAgent;

  log.log(Level.FINE, "[0] Created with target 1", new Object[] getLogId(), target);

NettyChannelBuilder传递给ManagedChannelImpl的几个参数非常关键,下面展开看下传递的几个参数。

5.1.1 第一个参数:AbstractManagedChannelImplBuilder

第一个参数就是AbstractManagedChannelImplBuilder类型,它是抽象类,具体的实现就是NettyChannelBuilder,通过初始化,该对象包含服务端的ip+端口,同时通过NegotiationType.PLAINTEXT指定了明文传输。

5.1.2 第二个参数:ClientTransportFactory

grpc-netty包通过调用NettyChannelBuilder#buildTransportFactory方法,创建了NettyTransportFactory对象,NettyTransportFactory类是ClientTransportFactory的实现类。NettyTransportFactory 它负责创建NettyClientTransport,表示通过Netty作为数据传输框架:

protected ClientTransportFactory buildTransportFactory() 
  return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions,
      negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
      maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls);

NettyTransportFactory接收来自NettyChannelBuilder的参数,最终通过NettyTransportFactory#newClientTransport创建NettyClientTransport成员。一个NettyClientTransport对象管理当前客户端与服务端的指定端口的服务的连接:

    public ConnectionClientTransport newClientTransport(
        SocketAddress serverAddress, String authority, @Nullable String userAgent) 
      checkState(!closed, "The transport factory is closed.");

      TransportCreationParamsFilter dparams =
          transportCreationParamsFilterFactory.create(serverAddress, authority, userAgent);

      final AtomicBackoff.State keepAliveTimeNanosState = keepAliveTimeNanos.getState();
      Runnable tooManyPingsRunnable = new Runnable() 
        @Override
        public void run() 
          keepAliveTimeNanosState.backoff();
        
      ;
      NettyClientTransport transport = new NettyClientTransport(
          dparams.getTargetServerAddress(), channelType, channelOptions, group,
          dparams.getProtocolNegotiator(), flowControlWindow,
          maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
          keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
          tooManyPingsRunnable);
      return transport;
    

NettyClientTransport维护netty的成员变量:

class NettyClientTransport implements ConnectionClientTransport 
  private final LogId logId = LogId.allocate(getClass().getName());
  private final Map<ChannelOption<?>, ?> channelOptions;
  //rpc服务端的socket地址
  private final SocketAddress address;
  //NiosocketChannel类型
  private final Class<? extends Channel> channelType;
  //Netty的NioEventLoopGroup
  private final EventLoopGroup group;
  private final ProtocolNegotiator negotiator;
  private final AsciiString authority;
  private final AsciiString userAgent;
  private final int flowControlWindow;
  private final int maxMessageSize;
  private final int maxHeaderListSize;
  private KeepAliveManager keepAliveManager;
  private final long keepAliveTimeNanos;
  private final long keepAliveTimeoutNanos;
  private final boolean keepAliveWithoutCalls;
  private final Runnable tooManyPingsRunnable;

  private ProtocolNegotiator.Handler negotiationHandler;
  //客户端的Handler
  private NettyClientHandler handler;
  // We should not send on the channel until negotiation completes. This is a hard requirement
  // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
  //NioSocketChannel对象
  private Channel channel;
  /** If @link #start has been called, non-@code null if channel is @code null. */
  private Status statusExplainingWhyTheChannelIsNull;

NettyClientTransport有两个重要方法:start和newStream方法:
start方法负责通过netty连接服务端:

 public Runnable start(Listener transportListener) 
    lifecycleManager = new ClientTransportLifecycleManager(
        Preconditions.checkNotNull(transportListener, "listener"));
    EventLoop eventLoop = group.next();
    if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) 
      keepAliveManager = new KeepAliveManager(
          new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
          keepAliveWithoutCalls);
    

    handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
        maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
    NettyHandlerSettings.setAutoWindow(handler);

    negotiationHandler = negotiator.newHandler(handler);

    Bootstrap b = new Bootstrap();
    b.group(eventLoop);
    b.channel(channelType);
    if (NioSocketChannel.class.isAssignableFrom(channelType)) 
      b.option(SO_KEEPALIVE, true);
    
    for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) 
      // Every entry in the map is obtained from
      // NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
      // so it is safe to pass the key-value pair to b.option().
      b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
    

    /**
     * We dont use a ChannelInitializer in the client bootstrap because its "initChannel" method
     * is executed in the event loop and we need this handler to be in the pipeline immediately so
     * that it may begin buffering writes.
     */
    b.handler(negotiationHandler);
    ChannelFuture regFuture = b.register();
    channel = regFuture.channel();
    if (channel == null) 
      // Initialization has failed badly. All new streams should be made to fail.
      Throwable t = regFuture.cause();
      if (t == null) 
        t = new IllegalStateException("Channel is null, but future doesnt have a cause");
      
      statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
      // Use a Runnable since lifecycleManager calls transportListener
      return new Runnable() 
        @Override
        public void run() 
          // NOTICE: we not are calling lifecycleManager from the event loop. But there isnt really
          // an event loop in this case, so nothing should be accessing the lifecycleManager. We
          // could use GlobalEventExecutor (which is what regFuture would use for notifying
          // listeners in this case), but avoiding on-demand thread creation in an error case seems
          // a good idea and is probably clearer threading.
          lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull);
        
      ;
    
    // Start the write queue as soon as the channel is constructed
    handler.startWriteQueue(channel);
    // Start the connection operation to the server.
    channel.connect(address);
    // This write will have no effect, yet it will only complete once the negotiationHandler
    // flushes any pending writes.
    channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() 
      @Override
      public void operationComplete(ChannelFuture future) throws Exception 
        if (!future.isSuccess()) 
          // Need to notify of this failure, because NettyClientHandler may not have been added to
          // the pipeline before the error occurred.
          lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()));
        
      
    );
    // Handle transport shutdown when the channel is closed.
    channel.closeFuture().addListener(new ChannelFutureListener() 
      @Override
      public void operationComplete(ChannelFuture future) throws Exception 
        // Typically we should have noticed shutdown before this point.
        lifecycleManager.notifyTerminated(
            Status.INTERNAL.withDescription("Connection closed with unknown cause"));
      
    );

    if (keepAliveManager != null) 
      keepAliveManager.onTransportStarted();
    

    return null;
  

连接过程中,注册了NettyClientHandler,这个是grpc通信的重中之重,它定义了HTTP2数据包发送的流程,也定义了数据包接收的流程。NettyClientHandler同时实现了ChannelInboundHandler和ChannelOutboundHandler。继承图下所示:

NettyClientHandler重写了ChannelOutboundHandler方法,根据不同的msg执行不同的行为:

  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
          throws Exception 
    if (msg instanceof CreateStreamCommand) 
      createStream((CreateStreamCommand) msg, promise);
     else if (msg instanceof SendGrpcFrameCommand) 
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
     else if (msg instanceof CancelClientStreamCommand) 
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
     else if (msg instanceof SendPingCommand) 
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
     else if (msg instanceof GracefulCloseCommand) 
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
     else if (msg instanceof ForcefulCloseCommand) 
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
     else if (msg == NOOP_MESSAGE) 
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
     else 
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
    
  

最重要的,sendGrpcFrame负责向远程发送数据包:

  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
      ChannelPromise promise) 
    // Call the base class to write the HTTP/2 DATA frame.
    // Note: no need to flush since this is handled by the outbound flow controller.
    encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
  

发送数据包流程较为复杂,大致思路是通过DefaultHttp2RemoteFlowController进行流量控制,控制发送的数据大小。由DefaultHttp2FrameWriter类发送数据包,数据包分两种:header帧和payload帧。
writeHeaders方法发送header帧:

    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
            Http2Headers headers, int streamDependency, short weight, boolean exclusive,
            int padding, boolean endStream, ChannelPromise promise) 
        return writeHeadersInternal(ctx, streamId, headers, padding, endStream,
                true, streamDependency, weight, exclusive, promise);
    

writeHeadersInternal定义Header帧的发送逻辑,简化后的处理过程如下:

   writeFrameHeaderInternal(buf, payloadLength, HEADERS, flags, streamId);
   writePaddingLength(buf, padding);

writeFrameHeaderInternal由header帧和payload帧共用的方法。定义了帧的类型,流ID,长度等等信息。Header帧使用了HEADERS类型:

    static void writeFrameHeaderInternal(ByteBuf out, int payloadLength, byte type,
            Http2Flags flags, int streamId) 
        out.writeMedium(payloadLength);
        out.writeByte(type);
        out.writeByte(flags.value());
        out.writeInt(streamId);
    

当发送payload帧时,调用writeData方法。调用writeFrameHeaderInternal,使用DATA类型表示发送payload帧:

    writeFrameHeaderInternal(frameHeader2, remainingData, DATA, flags, streamId);
    ctx.write(frameHeader2, promiseAggregator.newPromise());
    // Write the payload.
    ByteBuf lastFrame = data.readSlice(remainingData);
    data = null;
    ctx.write(lastFrame, promiseAggregator.newPromise());

5.2 创建RPC代理对象并执行

客户端main方法中,创建RPC代理对象:

   HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(channel);
   StreamObserver<Hello.HelloRequest> helloRequestStreamObserver = helloServiceStub.sayBad(streamObserver);

HelloServiceGrpc是通过protoc编译生成的RPC代理类。有以下方法:

客户端执行的两个方法如下:

    public io.grpc.stub.StreamObserver<protobuf.Hello.HelloRequest> sayBad(
        io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse> responseObserver) 
      return asyncBidiStreamingCall(
          getChannel().newCall(METHOD_SAY_BAD, getCallOptions()), responseObserver);
    

    public io.grpc.stub.StreamObserver<protobuf.Hello.HelloRequest> sayOK(
        io.grpc.stub.StreamObserver<protobuf.Hello.HelloResponse> responseObserver) 
      return asyncBidiStreamingCall(
          getChannel().newCall(METHOD_SAY_OK, getCallOptions()), responseObserver);
    

它们都执行asyncBidiStreamingCall方法,该方法的第一个参数是通过ManagedChannelImpl创建的ClientCallImpl对象,第二个参数responseObserver就是自定义的回调方法。传入asyncStreamingRequestCall方法中:

  private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
      ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver,
      boolean streamingResponse) 
    CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<ReqT>(call);
    startCall(
        call,
        new StreamObserverToCallListenerAdapter<ReqT, RespT>(
            responseObserver, adapter, streamingResponse),
        streamingResponse);
    return adapter;
  

通过ManagedChannelImpl$RealChannel#newCall创建调用对象,传入要执行的方法,以及在哪个executor线程池中执行:

public final class ManagedChannelImpl extends ManagedChannel implements WithLogId 
  private class RealChannel extends Channel 
    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
        CallOptions callOptions) 
      Executor executor = callOptions.getExecutor();
      if (executor == null) 
        executor = ManagedChannelImpl.this.executor;
      
      return new ClientCallImpl<ReqT, RespT>(
          method,
          executor,
          callOptions,
          transportProvider,
          terminated ? null : transportFactory.getScheduledExecutorService())
              .setDecompressorRegistry(decompressorRegistry)
              .setCompressorRegistry(compressorRegistry);
    

注意executor通过builder.executorPool创建:

    //线程池
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
    //executorPool.getObject()表示创建一个线程
    this.executor = checkNotNull(executorPool.getObject(), "executor");

而executorPool是AbstractManagedChannelImplBuilder类定义的DEFAULT_EXECUTOR_POOL:

public abstract class AbstractManagedChannelImplBuilder
        <T extends AbstractManagedChannelImplBuilder<T>> extends ManagedChannelBuilder<T> 
  private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL =
      SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);

executor的线程池定义线程名是grpc-default-executor:

public final class GrpcUtil 
  public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
      new Resource<ExecutorService>() 
        private static final String NAME = "grpc-default-executor";
        @Override
        public ExecutorService create() 
          return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
        

        @Override
        public void close(ExecutorService instance) 
          instance.shutdown();
        

        @Override
        public String toString() 
          return NAME;
        
      ;

创建好调用对象ClientCallImpl对象后,中间会执行start方法,responseObserver此时已经封装称为ClientCall.Listener了:

public final class ClientCalls 
  private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call,
      ClientCall.Listener<RespT> responseListener, boolean streamingResponse) 
    call.start(responseListener, new Metadata());
    if (streamingResponse) 
      call.request(1);
     else 
      // Initially ask for two responses from flow-control so that if a misbehaving server sends
      // more than one responses, we can catch it and fail it in the listener.
      call.request(2);
    
  

不管是调用那个rpc方法,都会执行ClientCallImpl#start方法,它创建并启动客户端的流DelayedStream:

final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> 
  public void start(final Listener<RespT> observer, Metadata headers) 
    checkState(stream == null, "Already started");
    checkNotNull(observer, "observer");
    checkNotNull(headers, "headers");

    final String compressorName = callOptions.getCompressor();
    Compressor compressor = null;
    //
    prepareHeaders(headers, decompressorRegistry, compressor);

    Deadline effectiveDeadline = effectiveDeadline();
    boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
    if (!deadlineExceeded) 
      updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
          context.getDeadline(), headers);
      ClientTransport transport = clientTransportProvider.get(
          new PickSubchannelArgsImpl(method, headers, callOptions));
      Context origContext = context.attach();
      try 
        //创建客户端流
        stream = transport.newStream(method, headers, callOptions);
       finally 
        context.detach(origContext);
      
     else 
      stream = new FailingClientStream(DEADLINE_EXCEEDED);
    

    if (callOptions.getAuthority() != null) 
      stream.setAuthority(callOptions.getAuthority());
    
    if (callOptions.getMaxInboundMessageSize() != null) 
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
    
    if (callOptions.getMaxOutboundMessageSize() != null) 
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
    
    stream.setCompressor(compressor);
    stream.setDecompressorRegistry(decompressorRegistry);
    //启动客户端流
    stream.start(new ClientStreamListenerImpl(observer));

    // Delay any sources of cancellation after start(), because most of the transports are broken if
    // they receive cancel before start. Issue #1343 has more details

    // Propagate later Context cancellation to the remote side.
    context.addListener(cancellationListener, directExecutor());
    if (effectiveDeadline != null
        // If the context has the effective deadline, we dont need to schedule an extra task.
        && context.getDeadline() != effectiveDeadline
        // If the channel has been terminated, we dont need to schedule an extra task.
        && deadlineCancellationExecutor != null) 
      deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
    
    if (cancelListenersShouldBeRemoved) 
      // Race detected! ClientStreamListener.closed may have been called before
      // deadlineCancellationFuture was set / context listener added, thereby preventing the future
      // and listener from being cancelled. Go ahead and cancel again, just to be sure it
      // was cancelled.
      removeContextListenerAndCancelDeadlineFuture();
    
  

启动DelayedStream时,调用父类AbstractClientStream#start方法,注意,该方法将streamObserver封装成的Listener设置成为AbstractClientStream的成员变量,共message读取时调用:

  public void start(ClientStreamListener listener) 
    checkState(this.listener == null, "already started");

    Status savedError;
    boolean savedPassThrough;
    synchronized (this) 
      this.listener = checkNotNull(listener, "listener");
      // If error != null, then cancel() has been called and was unable to close the listener
      savedError = error;
      savedPassThrough = passThrough;
      if (!savedPassThrough) 
        listener = delayedListener = new DelayedStreamListener(listener);
      
    
    if (savedError != null) 
      listener.closed(savedError, new Metadata());
      return;
    

    if (savedPassThrough) 
      realStream.start(listener);
     else 
      final ClientStreamListener finalListener = listener;
      delayOrExecute(new Runnable() 
        @Override
        public void run() 
          realStream.start(finalListener);
        
      );
    
  

客户端接收到服务器响应时,最终streamObserver中定义的处理逻辑会在异步线程中执行。如下所示,EventLoop线程此时正在执行NettyChannelHandler,NettyChannelHandler的祖父类ByteToMessageDecoder定义了消息接收方法,执行channelRead方法:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        if (msg instanceof ByteBuf) 
            CodecOutputList out = CodecOutputList.newInstance();
            try 
                first = cumulation == null;
                cumulation = cumulator.cumulate(ctx.alloc(),
                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
                //将消息解码
                callDecode(ctx, cumulation, out);
             catch (DecoderException e) 
                throw e;
             catch (Exception e) 
                throw new DecoderException(e);
             finally 
                //省略
            
        
    

callDecode过程中,会调用DefaultHttp2FrameReader#processPayloadState解析服务端返回的数据包:

public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSizePolicy, Configuration 
    private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)
                    throws Http2Exception 
        if (in.readableBytes() < payloadLength) 
            // Wait until the entire payload has been read.
            return;
        

        // Only process up to payloadLength bytes.
        int payloadEndIndex = in.readerIndex() + payloadLength;

        // We have consumed the data, next time we read we will be expecting to read a frame header.
        readingHeaders = true;

        // Read the payload and fire the frame event to the listener.
        switch (frameType) 
            case DATA:
                readDataFrame(ctx, in, payloadEndIndex, listener);
                break;
            case HEADERS:
                readHeadersFrame(ctx, in, payloadEndIndex, listener);
                break;
            case PRIORITY:
                readPriorityFrame(ctx, in, listener);
                break;
            case RST_STREAM:
                readRstStreamFrame(ctx, in, listener);
                break;
            case SETTINGS:
                readSettingsFrame(ctx, in, listener);
                break;
            case PUSH_PROMISE:
                readPushPromiseFrame(ctx, in, payloadEndIndex, listener);
                break;
            case PING:
                readPingFrame(ctx, in.readLong(), listener);
                break;
            case GO_AWAY:
                readGoAwayFrame(ctx, in, payloadEndIndex, listener);
                break;
            case WINDOW_UPDATE:
                readWindowUpdateFrame(ctx, in, listener);
                break;
            case CONTINUATION:
                readContinuationFrame(in, payloadEndIndex, listener);
                break;
            default:
                readUnknownFrame(ctx, in, payloadEndIndex, listener);
                break;
        
        in.readerIndex(payloadEndIndex);
    

根据返回的消息类型决定如何处理,比如处理HEADER帧,中间执行AbstractClientStream$TransportState#inboundHeadersReceived

public abstract class AbstractClientStream extends AbstractStream
    implements ClientStream, MessageFramer.Sink 
  protected abstract static class TransportState extends AbstractStream.TransportState 
    protected void inboundHeadersReceived(Metadata headers) 
      Preconditions.checkState(!statusReported, "Received headers on closed stream");
      statsTraceCtx.clientInboundHeaders();

      Decompressor decompressor = Codec.Identity.NONE;
      String encoding = headers.get(MESSAGE_ENCODING_KEY);
      if (encoding != null) 
        decompressor = decompressorRegistry.lookupDecompressor(encoding);
        if (decompressor == null) 
          deframeFailed(Status.INTERNAL.withDescription(
              String.format("Cant find decompressor for %s", encoding)).asRuntimeException());
          return;
        
      
      setDecompressor(decompressor);

      listener().headersRead(headers);
    
  

最终调用ClientCallImpl#headersRead方法:

final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> 
    public void headersRead(final Metadata headers) 
      class HeadersRead extends ContextRunnable 
        HeadersRead() 
          super(context);
        

        @Override
        public final void runInContext() 
          try 
            if (closed) 
              return;
            
            observer.onHeaders(headers);
           catch (Throwable t) 
            Status status =
                Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
            stream.cancel(status);
            close(status, new Metadata());
          
        
      

      callExecutor.execute(new HeadersRead());
    

在执行observer.onHeaders(headers)时,底层调用的是StreamObserverToCallListenerAdapter#onHeaders方法,为空实现:

public final class ClientCalls 
  private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
      extends ClientCall.Listener<RespT> 
    public void onHeaders(Metadata headers) 
    
  

当读取DATA帧时,调用readDataFrame方法,中间执行AbstractClientStream$TransportState#inboundDataReceived方法:

public abstract class AbstractClientStream extends AbstractStream
    implements ClientStream, MessageFramer.Sink 
  protected abstract static class TransportState extends AbstractStream.TransportState 
    protected void inboundDataReceived(ReadableBuffer frame) 
      Preconditions.checkNotNull(frame, "frame");
      boolean needToCloseFrame = true;
      try 
        if (statusReported) 
          log.log(Level.INFO, "Received data on closed stream");
          return;
        

        needToCloseFrame = false;
        //解析接受到的DATA帧
        deframe(frame);
       finally 
        if (needToCloseFrame) 
          frame.close();
        
      
    
  

DATA帧中的处理逻辑稍显复杂,会通过MessageDeframer#deliver进行DATA数据投递:

public class MessageDeframer implements Closeable, Deframer 
  private void deliver() 
    // We can have reentrancy here when using a direct executor, triggered by calls to
    // request more messages. This is safe as we simply loop until pendingDelivers = 0
    if (inDelivery) 
      return;
    
    inDelivery = true;
    try 
      // Process the uncompressed bytes.
      while (!stopDelivery && pendingDeliveries > 0 && readRequiredBytes()) 
        switch (state) 
          //处理DATA帧中的HTTP消息头
          case HEADER:
            processHeader();
            break;
          //处理DATA帧中的HTTP消息体
          case BODY:
            // Read the body and deliver the message.
            processBody();
            // Since weve delivered a message, decrement the number of pending
            // deliveries remaining.
            pendingDeliveries--;
            break;
          default:
            throw new AssertionError("Invalid state: " + state);
        
      
    //省略
  

处理到消息体时,调用MessageDeframer#processBody方法:

public class MessageDeframer implements Closeable, Deframer 
  private void processBody() 
    InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
    nextFrame = null;
    listener.messagesAvailable(new SingleMessageProducer(stream));

    // Done with this frame, begin processing the next header.
    state = State.HEADER;
    requiredLength = HEADER_LENGTH;
  

后面就调用AbstractClientStream$TransportState#messagesAvailable方法:

final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> 
  private class ClientStreamListenerImpl implements ClientStreamListener 
    public void messagesAvailable(final MessageProducer producer) 
      class MessagesAvailable extends ContextRunnable 
        MessagesAvailable() 
          super(context);
        

        @Override
        public final void runInContext() 
          if (closed) 
            GrpcUtil.closeQuietly(producer);
            return;
          

          InputStream message;
          try 
            while ((message = producer.next()) != null) 
              try 
                //调用DATA的处理逻辑
                observer.onMessage(method.parseResponse(message));
               catch (Throwable t) 
                GrpcUtil.closeQuietly(message);
                throw t;
              
              message.close();
            
           catch (Throwable t) 
            GrpcUtil.closeQuietly(producer);
            Status status =
                Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
            stream.cancel(status);
            close(status, new Metadata());
          
        
      
      //将Messages的处理逻辑封装成一个线程,放到线程池中执行
      callExecutor.execute(new MessagesAvailable());
    
  

callExecutor就是将ClientCallImpl初始化的executor,即传SHARED_CHANNEL_EXECUTOR线程池进行包装到SerializingExecutor类中:

    this.callExecutor = executor == directExecutor()
        ? new SerializeReentrantCallsDirectExecutor()
        : new SerializingExecutor(executor);

SerializingExecutor类依次通过异步线程池执行队列中的Runnable逻辑,保证了执行的前后顺序:

public SerializingExecutor(Executor executor) 
    Preconditions.checkNotNull(executor, "executor must not be null.");
    this.executor = executor;
  

  /**
   * Runs the given runnable strictly after all Runnables that were submitted
   * before it, and using the @code executor passed to the constructor.     .
   */
  @Override
  public void execute(Runnable r) 
    runQueue.add(checkNotNull(r, "r must not be null."));
    schedule(r);
  

  private void schedule(@Nullable Runnable removable) 
    if (running.compareAndSet(false, true)) 
      boolean success = false;
      try 
        executor.execute(this);
        success = true;
       finally 
        // It is possible that at this point that there are still tasks in
        // the queue, it would be nice to keep trying but the error may not
        // be recoverable.  So we update our state and propagate so that if
        // our caller deems it recoverable we wont be stuck.
        if (!success) 
          if (removable != null) 
            // This case can only be reached if this was not currently running, and we failed to
            // reschedule.  The item should still be in the queue for removal.
            // ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
            // throw if the item to remove is null.  If removable is present in the queue twice,
            // the wrong one may be removed.  It doesnt seem possible for this case to exist today.
            // This is important to run in case of RejectedExectuionException, so that future calls
            // to execute dont succeed and accidentally run a previous runnable.
            runQueue.remove(removable);
          
          running.set(false);
        
      
    
  

在异步线程中执行DATA处理逻辑,observer.onMessage(method.parseResponse(message))其实就是调用ClientCalls$StreamObserverToCallListenerAdapter#onMessage方法,它执行observer.onNext(message),即执行用户自定义的处理逻辑:

public final class ClientCalls 
  private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
      extends ClientCall.Listener<RespT> 
    public void onMessage(RespT message) 
      if (firstResponseReceived && !streamingResponse) 
        throw Status.INTERNAL
            .withDescription("More than one responses received for unary or client-streaming call")
            .asRuntimeException();
      
      firstResponseReceived = true;
      observer.onNext(message);

      if (streamingResponse && adapter.autoFlowControlEnabled) 
        // Request delivery of the next inbound message.
        adapter.request(1);
      
    
  

上述执行的onNext方法就是我在客户端启动类中定义的StreamObserver匿名类:

public class StreamClient 
   static StreamObserver<Hello.HelloResponse> streamObserver = new StreamObserver<Hello.HelloResponse>()

        @Override
        public void onNext(Hello.HelloResponse value) 
           System.out.println(value.getReply());
        

        @Override
        public void onError(Throwable t) 
            System.out.println(t.getMessage());
        

        @Override
        public void onCompleted() 
            System.out.println("completed");
        
    ;

将DATA帧中的数据放到异步线程中执行,应该是担心客户端处理服务端响应的数据包时间过久,导致netty的EventLoop线程阻塞,这就无法处理其他socket数据了。

5.3 Netty客户端连接服务端

通过5.2节可以知道,在调用sayBad这个rpc方法时,中间会调用ClientCallImpl#start方法,如下所示:

  public void start(final Listener<RespT> observer, Metadata headers) 
    //省略
    prepareHeaders(headers, decompressorRegistry, compressor);

    Deadline effectiveDeadline = effectiveDeadline();
    boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
    if (!deadlineExceeded) 
      updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
          context.getDeadline(), headers);
      //创建DelayedClientTransport,实际最终还是创建NettyClientTransport对象
      ClientTransport transport = clientTransportProvider.get(
          new PickSubchannelArgsImpl(method, headers, callOptions));
      Context origContext = context.attach();
      try 
        //创建DelayedStream对象
        stream = transport.newStream(method, headers, callOptions);
       finally 
        context.detach(origContext);
      
     else 
      stream = new FailingClientStream(DEADLINE_EXCEEDED);
    

    if (callOptions.getAuthority() != null) 
      stream.setAuthority(callOptions.getAuthority());
    
    if (callOptions.getMaxInboundMessageSize() != null) 
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
    
    if (callOptions.getMaxOutboundMessageSize() != null) 
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
    
    stream.setCompressor(compressor);
    stream.setDecompressorRegistry(decompressorRegistry);
    stream.start(new ClientStreamListenerImpl(observer));

    // Delay any sources of cancellation after start(), because most of the transports are broken if
    // they receive cancel before start. Issue #1343 has more details

    // Propagate later Context cancellation to the remote side.
    context.addListener(cancellationListener, directExecutor());
  

通过调用自定义的get方法获取DelayedClientTransport对象:

public final class ManagedChannelImpl extends ManagedChannel implements WithLogId 
    private final ClientCallImpl.ClientTransportProvider transportProvider = new ClientCallImpl.ClientTransportProvider() 
        public ClientTransport get(LoadBalancer.PickSubchannelArgs args) 
            LoadBalancer.SubchannelPicker pickerCopy = ManagedChannelImpl.this.subchannelPicker;
            if (ManagedChannelImpl.this.shutdown.get()) 
                return ManagedChannelImpl.this.delayedTransport;
             else if (pickerCopy == null) 
                ManagedChannelImpl.this.channelExecutor.executeLater(new Runnable() 
                    public void run() 
                        //退出空闲模式
                        ManagedChannelImpl.this.exitIdleMode();
                    
                ).drain();
                return ManagedChannelImpl.this.delayedTransport;
             else 
                LoadBalancer.PickResult pickResult = pickerCopy.pickSubchannel(args);
                ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, args.getCallOptions().isWaitForReady());
                return (ClientTransport)(transport != null ? transport : ManagedChannelImpl.this.delayedTransport);
            
        
    ;

在返回DelayedClientTransport前,调用ManagedChannelImpl.this.exitIdleMode()退出空闲模式:

    void exitIdleMode() 
        if (!this.shutdown.get()) 
            if (this.inUseStateAggregator.isInUse()) 
                this.cancelIdleTimer();
             else 
                this.rescheduleIdleTimer();
            

            if (this.lbHelper == null) 
                log.log(Level.FINE, "[0] Exiting idle mode", this.getLogId());
                this.lbHelper = new LbHelperImpl(this.nameResolver);
                this.lbHelper.lb = this.loadBalancerFactory.newLoadBalancer(this.lbHelper);
                NameResolverListenerImpl listener = new NameResolverListenerImpl(this.lbHelper);

                try 
                    //启动nameResolver
                    this.nameResolver.start(listener);
                 catch (Throwable var3) 
                    listener.onError(Status.fromThrowable(var3));
                

            
        
    

nameResolver实现是DnsNameResolver,它负责将域名解析称为等效的ip:

  public final synchronized void start(Listener listener) 
    Preconditions.checkState(this.listener == null, "already started");
    timerService = SharedResourceHolder.get(timerServiceResource);
    //线程池,之前定义的GrpcUtil.SHARED_CHANNEL_EXECUTOR
    executor = SharedResourceHolder.get(executorResource);
    this.listener = Preconditions.checkNotNull(listener, "listener");
    //解析域名
    resolve();
  

  private void resolve() 
    if (resolving || shutdown) 
      return;
    
    executor.execute(resolutionRunnable);
  

在异步线程中执行resolutionRunnable线程:

  private final Runnable resolutionRunnable = new Runnable() 
      @Override
      public void run() 
        Listener savedListener;
        synchronized (DnsNameResolver.this) 
          // If this task is started by refresh(), there might already be a scheduled task.
          if (resolutionTask != null) 
            resolutionTask.cancel(false);
            resolutionTask = null;
          
          if (shutdown) 
            return;
          
          savedListener = listener;
          resolving = true;
        
        try 
          if (System.getenv("GRPC_PROXY_EXP") != null) 
            EquivalentAddressGroup server =
                new EquivalentAddressGroup(InetSocketAddress.createUnresolved(host, port));
            savedListener.onAddresses(Collections.singletonList(server), Attributes.EMPTY);
            return;
          
          ResolutionResults resolvedInetAddrs;
          try 
            resolvedInetAddrs = delegateResolver.resolve(host);
           catch (Exception e) 
            synchronized (DnsNameResolver.this) 
              if (shutdown) 
                return;
              
              // Because timerService is the single-threaded GrpcUtil.TIMER_SERVICE in production,
              // we need to delegate the blocking work to the executor
              resolutionTask =
                  timerService.schedule(new LogExceptionRunnable(resolutionRunnableOnExecutor),
                      1, TimeUnit.MINUTES);
            
            savedListener.onError(Status.UNAVAILABLE.withCause(e));
            return;
          
          // Each address forms an EAG
          ArrayList<EquivalentAddressGroup> servers = new ArrayList<EquivalentAddressGroup>();
          for (InetAddress inetAddr : resolvedInetAddrs.addresses) 
            servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port)));
          
          //执行Listener#onAddresses,处理这个地址相应连接
          savedListener.onAddresses(servers, Attributes.EMPTY);
         finally 
          synchronized (DnsNameResolver.this) 
            resolving = false;
          
        
      
    ;

然后执行到ManagedChannelImpl#onAddresses:

    public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) 
      if (servers.isEmpty()) 
        onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
        return;
      
      log.log(Level.FINE, "[0] reso

以上是关于深入浅出掌握grpc通信框架的主要内容,如果未能解决你的问题,请参考以下文章

gRPC 通信框架实现存在数据泄露等安全问题

年轻人的第一本gRPC中文书!

微服务 - 服务之间的通信gRPC

基于GRPC+consul通信的服务化框架

gRPC框架学习Note 1

聊一聊 gRPC 的四种通信模式