RPC-Thrift

Posted 在周末

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC-Thrift相关的知识,希望对你有一定的参考价值。

一个简单例子

  IDL文件如下,详细的IDL语法参考官方文档http://thrift.apache.org/docs/idl。

  通过代码生成工具得到两个文件:HelloService.java和ResultCommon.java。

namespace java com.mytest.thrift

struct ResultCommon{
    1:i32      resultCode,
    2:string   desc
}

service HelloService{
    ResultCommon sayHello(1:string paramJson)
}

 

  Thrift业务HelloService.Iface接口的实现如下

public class HelloHandler implements HelloService.Iface {
    private Logger logger = LoggerFactory.getLogger(HelloHandler.class);
    @Override
    public ResultCommon sayHello(String paramJson) throws TException {
        logger.info("receive request param : {}", paramJson);
        ResultCommon response = new ResultCommon();
        response.setDesc("Hello World!");
        return response;
    }
}

 

  Thrift RPC服务端实现

public class RpcServer {
    public static void main(String[] args) throws TTransportException {
        //基于阻塞式同步IO模型
        TServerSocket tServerSocket = new TServerSocket(8090);
        HelloService.Processor<Iface> processor = new HelloService.Processor<HelloService.Iface>(new HelloHandler());
        Args args1 = new Args(tServerSocket);
        args1.processor(processor);
        //消息格式使用二进制 
        args1.protocolFactory(new TBinaryProtocol.Factory());
        //线程池的最大、最小线程数
        args1.maxWorkerThreads(10);
        args1.minWorkerThreads(1);
        //启动服务
        TThreadPoolServer server = new TThreadPoolServer(args1);
        //在此处阻塞
        server.serve();
    }
}

 

  Thrift RPC客户端实现

public class RpcClient {
    public static void main(String[] args) throws TException {
        TSocket tSocket = new TSocket("127.0.0.1", 8090);
        tSocket.open();
        TProtocol tProtocol = new TBinaryProtocol(tSocket);
        HelloService.Client client = new HelloService.Client(tProtocol);
        String paramJson = "{\\"wewe\\":\\"111\\"}";
        ResultCommon resultCommon = client.sayHello(paramJson);
        System.out.println(resultCommon.getDesc());
        tSocket.close();
    }
}

 

  注意点:1)Thrift客户端和服务端使用的I/O模型必须一致,上例中都是使用阻塞式同步I/O模型。

      2)Thrift客户端和服务端使用的消息格式必须一致,上例中都是使用二进制流格式TBinaryProtocol。

Thrift RPC详解

  Thrift协议栈如下图所示:  

 

   

    底层I/O模块:负责实际的数据传输,可以是Socket、文件、压缩数据流等;

    TTransport:定义了消息怎样在Client和Server之间进行通信的,负责以字节流的方式发送和接收消息。TTransport不同的子类负责Thrift字节流(Byte Stream)数据在不同的IO模块上的传输,如:TSocket负责Socket传输,TFileTransport负责文件传输;

    TProtocol:定义了消息时怎样进行序列化的,即负责结构化数据(如对象、结构体等)与字节流消息的转换,对Client侧是将结构化数据组装成字节流消息,对Server端则是从字节流消息中提取结构化数据。TProtocol不同的子类对应不同的消息格式转换,如TBinaryProtocol对应字节流。

    TServer:负责接收客户端请求,并将请求转发给Processor。TServer各个子类实现机制不同,性能也差距很大。

    Processor:负责处理客户端请求并返回响应,包括RPC请求转发、参数解析、调用用户定义的代码等。Processor的代码时Thrift根据IDL文件自动生成的,用户只需根据自动生成的接口进行业务逻辑的实现就可以,Processor是Thrift框架转入用户逻辑的关键。

    ServiceClient:负责客户端发送RPC请求,和Processor一样,该部分的代码也是由Thrift根据IDL文件自动生成的。

Thrift核心类库实现原理

  TServer

    主要负责接收并转发Client的请求。TServer的类结构图如下:

      

    

 

    Thrift提供了多种TServer的实现,不同的TServer使用了不同的模型,适用的情况也有所不同。

      TSimpleServer:阻塞I/O单线程Server,主要用于测试;

      TThreadPoolServer:阻塞I/O多线程Server,多线程使用Java并发包中的线程池ThreadPoolExecutor。

      AbstractNonblockingServer:抽象类,为非阻塞I/O Server类提供共同的方法和类。

      TNonblockingServer:多路复用I/O单线程Server,依赖于TFramedTransport;

      THsHaServer:半同步/半异步Server,多线程处理业务逻辑调用,同样依赖于TFramedTransport;

      TThreadedSelectorServer:半同步/半异步Server,依赖于TFramedTransport。

    下面详细分析一下各个TServer的实现原理

    TSimpleServer

      TSimpleServer每次只能处理一个连接,直到客户端关闭了连接,它才回去接受一个新的连接,正因为它只在一个单独的线程中以阻塞I/O的方式完成这些工作,所以它只能服务一个客户端连接,其他所有客户端在被服务器端接受之前都只能等待。TSimpleServer的效率很低,不能用在生产环境。通过源码具体分析实现机制。

public void serve() {
  stopped_ = false;
  try {
    //启动监听Socket
    serverTransport_.listen();
  } catch (TTransportException ttx) {
    LOGGER.error("Error occurred during listening.", ttx);
    return;
  }
  setServing(true);    //置状态为正在服务
  //一次只能处理一个Socket连接
  while (!stopped_) {
    TTransport client = null;
    TProcessor processor = null;
    TTransport inputTransport = null;
    TTransport outputTransport = null;
    TProtocol inputProtocol = null;
    TProtocol outputProtocol = null;
    try {
      client = serverTransport_.accept(); //接收连接请求,若没有则一直阻塞
      if (client != null) {
        processor = processorFactory_.getProcessor(client);
        inputTransport = inputTransportFactory_.getTransport(client);
        outputTransport = outputTransportFactory_.getTransport(client);
        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
        //处理该请求直到成功
        while (processor.process(inputProtocol, outputProtocol)) {}
      }
    } catch (TTransportException ttx) {
      // Client died, just move on
    } catch (TException tx) {
      if (!stopped_) {
        LOGGER.error("Thrift error occurred during processing of message.", tx);
      }
    } catch (Exception x) {
      if (!stopped_) {
        LOGGER.error("Error occurred during processing of message.", x);
      }
    }

    if (inputTransport != null) {
      inputTransport.close();
    }

    if (outputTransport != null) {
      outputTransport.close();
    }

  }
  setServing(false); 
}

 

      由源代码可以分析出,TSimpleServer的处理流程如下:      

    

      

     TThreadPoolServer

      TThreadPoolServer也是基于阻塞I/O模型,与TSimpleServer不同的是,它使用线程池来提高效率。

      TThreadPoolServer的构造函数如下,使用了JDK并发包提供的线程池ThreadPoolExecutor,可配置最大线程数(默认为Integer.Max)和最小线程数(默认5),线程池的阻塞队列使用的是SynchronousQueue,每个put操作必须等待一个take操作,如果不满足条件,put操作和take操作将会被阻塞。

  // Executor service for handling client connections
  private ExecutorService executorService_;
  //关闭Server时的最长等待时间
  private final TimeUnit stopTimeoutUnit;
  private final long stopTimeoutVal;
  public TThreadPoolServer(Args args) {
    super(args);
    //同步阻塞队列,每个put操作必须等待一个take操作,没有容量,常用于线程间交换单一元素
    SynchronousQueue<Runnable> executorQueue =
      new SynchronousQueue<Runnable>();
    stopTimeoutUnit = args.stopTimeoutUnit;
    stopTimeoutVal = args.stopTimeoutVal;
    //初始化线程池
    executorService_ = new ThreadPoolExecutor(args.minWorkerThreads,
                                              args.maxWorkerThreads,
                                              60,
                                              TimeUnit.SECONDS,
                                              executorQueue);
  }

 

       再看一下TThreadPoolServer的serve()方法,主线程专门用来接受连接,一旦接收了一个连接,该Client连接会被放入ThreadPoolExecutor中的一个worker线程里处理,主线程继续接收下一个Client连接请求。由于线程池的阻塞队列使用的是SynchronousQueue,所以TThreadPoolServer能够支撑的最大Client连接数为线程池的线程数,也就是说每个Client连接都会占用一个线程。需要注意的是,当并发的Client连接数很大时,Server端的线程数会很大,可能会引发Server端的性能问题。

  public void serve() {
    try {
      //启动监听Socket
      serverTransport_.listen();
    } catch (TTransportException ttx) {
      LOGGER.error("Error occurred during listening.", ttx);
      return;
    }
    stopped_ = false;
    setServing(true);
    //如果Server没有被停止,就一直循环
    while (!stopped_) {
      int failureCount = 0;
      try {
        //阻塞方式接收Client连接请求,每收到一个Client连接请求就新建一个Worker,放入线程池处理该连接的业务
        TTransport client = serverTransport_.accept();
        WorkerProcess wp = new WorkerProcess(client);
        executorService_.execute(wp);
      } catch (TTransportException ttx) {
        if (!stopped_) {
          ++failureCount;
          LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
        }
      }
    }
    //Server停止,关闭线程池
    executorService_.shutdown();

    // Loop until awaitTermination finally does return without a interrupted
    // exception. If we don\'t do this, then we\'ll shut down prematurely. We want
    // to let the executorService clear it\'s task queue, closing client sockets
    // appropriately.
    //在timeoutMS时间内,循环直到完成调用awaitTermination方法。防止过早的关闭线程池,关闭遗留的client sockets。
    long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
    long now = System.currentTimeMillis();
    while (timeoutMS >= 0) {
      try {
        //awaitTermination方法调用会被阻塞,直到所有任务执行完毕并且shutdown请求被调用,或者参数中定义的timeout时间到达或者当前线程被中断
        executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
        break;
      } catch (InterruptedException ix) {
        //如果发生中断异常,继续循环
        long newnow = System.currentTimeMillis();
        timeoutMS -= (newnow - now);
        now = newnow;
      }
    }
    setServing(false);
  }

 

       最后看一下WorkerProcess类。WorkerProcess是TThreadPoolServer的内部类。每个WorkerProcess线程被绑定到特定的客户端连接上,处理该连接上的请求,直到它关闭,一旦连接关闭,该worker线程就又回到了线程池中。

  private class WorkerProcess implements Runnable {
    private TTransport client_;
    private WorkerProcess(TTransport client) {
      client_ = client;
    }
    public void run() {
      TProcessor processor = null;
      TTransport inputTransport = null;
      TTransport outputTransport = null;
      TProtocol inputProtocol = null;
      TProtocol outputProtocol = null;
      try {
        processor = processorFactory_.getProcessor(client_);
        inputTransport = inputTransportFactory_.getTransport(client_);
        outputTransport = outputTransportFactory_.getTransport(client_);
        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
        // we check stopped_ first to make sure we\'re not supposed to be shutting
        // down. this is necessary for graceful shutdown.
        //循环处理该Client连接的请求,除非Server关闭或连接异常否则一直循环
        while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {}
      } catch (TTransportException ttx) {
        // Assume the client died and continue silently
      } catch (TException tx) {
        LOGGER.error("Thrift error occurred during processing of message.", tx);
      } catch (Exception x) {
        LOGGER.error("Error occurred during processing of message.", x);
      }
      //关闭inputTransport和outputTransport
      if (inputTransport != null) {
        inputTransport.close();
      }
      if (outputTransport != null) {
        outputTransport.close();
      }
    }
  }

 

       用流程图表示TThreadPoolServer的处理流程如下:

      

 

    AbstractNonblockingServer

      AbstractNonblockingServer类是非阻塞I/O TServer的父类,提供了公用的方法和类。先通过源码了解它的实现机制。启动服务的大致流程为 startThreads() -> startListening() -> setServing(true) -> waitForShutdown(),具体内容依赖于AbstractNonblockingServer子类的具体实现。基于Java NIO(多路复用I/O模型)实现。

public abstract class AbstractNonblockingServer extends TServer {
  protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());

  public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
    //读缓冲区的最大字节数
    public long maxReadBufferBytes = Long.MAX_VALUE;
    //设置父类inputTransportFactory_、outputTransportFactory_对象
    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
      super(transport);
      transportFactory(new TFramedTransport.Factory());
    }
  }
  private final long MAX_READ_BUFFER_BYTES;
  //已分配读缓存字节数
  private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
  public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
    super(args);
    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
  }
  /**
   * Begin accepting connections and processing invocations.
   */
  public void serve() {
    // start any IO threads  启动IO线程
    if (!startThreads()) {
      return;
    }
    // start listening, or exit    开启监听端口,接收Client请求
    if (!startListening()) {
      return;
    }
    setServing(true);    //置状态为服务中
    // this will block while we serve
    waitForShutdown();    //启动服务后的阻塞方法,Server停止后会解除阻塞
    setServing(false);    //置状态为服务结束
    // do a little cleanup
    stopListening();    //停止监听端口
  }

  /**
   * Starts any threads required for serving.
   * 
   * @return true if everything went ok, false if threads could not be started.
   */
  protected abstract boolean startThreads();//启动IO线程,由子类实现

  /**
   * A method that will block until when threads handling the serving have been
   * shut down.
   */
  protected abstract void waitForShutdown();//启动服务后的阻塞方法,Server停止后会解除阻塞,由子类实现
  //开启监听端口
  protected boolean startListening() {
    try {
      serverTransport_.listen();
      return true;
    } catch (TTransportException ttx) {
      LOGGER.error("Failed to start listening on server socket!", ttx);
      return false;
    }
  }
  //停止监听端口
  protected void stopListening() {
    serverTransport_.close();
  }

  /**
   * Perform an invocation. This method could behave several different ways -
   * invoke immediately inline, queue for separate execution, etc.
   * 
   * @return true if invocation was successfully requested, which is not a
   *         guarantee that invocation has completed. False if the request
   *         failed.
   */
  protected abstract boolean requestInvoke(FrameBuffer frameBuffer);//对frameBuffer执行业务逻辑,由子类实现
}

 

      AbstractNonblockingServer的内部类 FrameBuffer是非阻塞I/O TServer实现读写数据的核心类。FrameBuffer类存在多种状态,不同的状态表现出不同的行为,先看一下FrameBufferState枚举类。

  private enum FrameBufferState {
    // in the midst of reading the frame size off the wire 读取FrameSize的状态
    READING_FRAME_SIZE,
    // reading the actual frame data now, but not all the way done yet 读取真实数据的状态
    READING_FRAME,    
    // completely read the frame, so an invocation can now happen 完成读取数据,调用业务处理方法
    READ_FRAME_COMPLETE,
    // waiting to get switched to listening for write events 完成业务调用,等待被转换为监听写事件
    AWAITING_REGISTER_WRITE,
    // started writing response data, not fully complete yet 写response数据状态
    WRITING,
    // another thread wants this framebuffer to go back to reading 
    //完成写response数据,等待另一个线程注册为读事件,注册成功后变为READING_FRAME_SIZE状态
    AWAITING_REGISTER_READ,
    // we want our transport and selection key invalidated in the selector
    // thread 上面任一种状态执行异常时处于该状态,selector轮询时会关闭该连接
    AWAITING_CLOSE
  }

      如果Client需要返回结果,FrameBuffer状态转换过程为: READING_FRAME_SIZE -> READING_FRAME -> READ_FRAME_COMPLETE -> AWAITING_REGISTER_WRITE -> WRITING -> AWAITING_REGISTER_READ -> READING_FRAME_SIZE ;

      如果Client不需要返回结果,FrameBuffer状态转换过程为: READING_FRAME_SIZE -> READING_FRAME -> READ_FRAME_COMPLETE -> AWAITING_REGISTER_READ -> READING_FRAME_SIZE ;

      如果以上任何状态执行时出现异常,FrameBuffer状态将转换为 AWAITING_CLOSE。

      FrameBuffer类的源码分析如下,FrameBuffer与SelectionKey绑定,它实现了从客户端读取数据、调用业务逻辑、向客户端返回数据,并管理阈值绑定的SelectionKey的注册事件的改变。

  protected class FrameBuffer {
    // the actual transport hooked up to the client.
    private final TNonblockingTransport trans_;//与客户端建立的连接,具体的实现是TNonblockingSocket
    // the SelectionKey that corresponds to our transport
    private final SelectionKey selectionKey_;//该FrameBuffer对象关联的SelectionKey对象
    // the SelectThread that owns the registration of our transport
    private final AbstractSelectThread selectThread_;//该FrameBuffer对象所属的selectThread_线程
    // where in the process of reading/writing are we?
    private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;//该FrameBuffer对象的状态
    // the ByteBuffer we\'ll be using to write and read, depending on the state
    private ByteBuffer buffer_;//读写数据时使用的buffer,Java NIO
    private TByteArrayOutputStream response_;//执行完业务逻辑后,保存在本地的结果

    public FrameBuffer(final TNonblockingTransport trans,
        final SelectionKey selectionKey,
        final AbstractSelectThread selectThread) {
      trans_ = trans;
      selectionKey_ = selectionKey;
      selectThread_ = selectThread;
      buffer_ = ByteBuffer.allocate(4);//因为TFramedTransport的frameSize为4-byte,所以分配4字节
    }

    /**
     * Give this FrameBuffer a chance to read. The selector loop should have
     * received a read event for this FrameBuffer.
     * 
     * @return true if the connection should live on, false if it should be
     *         closed
     */
    //读取一次数据,如果状态为READING_FRAME_SIZE,则读取FrameSize;如果状态为READING_FRAME,则读数据
    public boolean read() {
      if (state_ == FrameBufferState.READING_FRAME_SIZE) {
        // try to read the frame size completely 
        //从trans_读取数据到buffer_中,数据大小小于等于Framesize
        if (!internalRead()) {
          return false;
        }

        // if the frame size has been read completely, then prepare to read the
        // actual frame.
        //remaining()返回buffer_剩余的可用长度,返回0代表buffer_的4字节缓存已经被占满,即读完了FrameSize
        if (buffer_.remaining() == 0) {
          // pull out the frame size as an integer.
          int frameSize = buffer_.getInt(0);//转化为Int型frameSize
          //对frameSize进行校验
          if (frameSize <= 0) {
            LOGGER.error("Read an invalid frame size of " + frameSize
                + ". Are you using TFramedTransport on the client side?");
            return false;
          }
          // if this frame will always be too large for this server, log the
          // error and close the connection.
          if (frameSize > MAX_READ_BUFFER_BYTES) {
            LOGGER.error("Read a frame size of " + frameSize
                + ", which is bigger than the maximum allowable buffer size for ALL connections.");
            return false;
          }
          // if this frame will push us over the memory limit, then return.
          // with luck, more memory will free up the next time around.
          // 超出已分配读缓存字节数,返回true,等待下次读取
          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
            return true;
          }
          // increment the amount of memory allocated to read buffers已分配读缓存字节数增加frameSize
          readBufferBytesAllocated.addAndGet(frameSize);
          // reallocate the readbuffer as a frame-sized buffer
          //frameSize通过校验后,重新为buffer_分配frameSize大小的缓存空间,读取真实数据时使用
          buffer_ = ByteBuffer.allocate(frameSize);
          //frameSize通过校验后,将状态改为READING_FRAME,接着读真实数据
          state_ = FrameBufferState.READING_FRAME;
        } else {
          // this skips the check of READING_FRAME state below, since we can\'t
          // possibly go on to that state if there\'s data left to be read at
          // this one.
          //buffer_还有剩余空间,即还没有读完FrameSize,返回true,下次继续读
          return true;
        }
      }

      // it is possible to fall through from the READING_FRAME_SIZE section
      // to READING_FRAME if there\'s already some frame data available once
      // READING_FRAME_SIZE is complete.

      if (state_ == FrameBufferState.READING_FRAME) {
        if (!internalRead()) {
          return false;
        }
        // since we\'re already in the select loop here for sure, we can just
        // modify our selection key directly.
        //此时的buffer_大小为frameSize,当==0时,说明数据读取完成
        if (buffer_.remaining() == 0) {
          // get rid of the read select interests
          //注销掉当前FrameBuffer关联的selectionKey_的read事件
          selectionKey_.interestOps(0);
          //修改状态为READ_FRAME_COMPLETE
          state_ = FrameBufferState.READ_FRAME_COMPLETE;
        }
        //数据读取没有完成,返回true下次继续读取
        return true;
      }
      // if we fall through to this point, then the state must be invalid.
      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
      return false;
    }

    /**
     * Give this FrameBuffer a chance to write its output to the final client.写数据
     */
    public boolean write() {
      if (state_ == FrameBufferState.WRITING) {
        try {
          //将buffer_中的数据写入客户端trans_
          if (trans_.write(buffer_) < 0) {
            return false;
          }
        } catch (IOException e) {
          LOGGER.warn("Got an IOException during write!", e);
          return false;
        }
        // we\'re done writing. now we need to switch back to reading.
        if (buffer_.remaining() == 0) {
          prepareRead();//已经write完成,准备切换为读模式
        }
        return true;
      }
      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
      return false;
    }

    /**
     * Give this FrameBuffer a chance to set its interest to write, once data
     * has come in. 修改selectionKey_的事件,当状态为AWAITING_状态时调用,
     */
    public void changeSelectInterests() {
      if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
        // set the OP_WRITE interest
        selectionKey_.interestOps(SelectionKey.OP_WRITE);
        state_ = FrameBufferState.WRITING;
      } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
        prepareRead();
      } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
        close();
        selectionKey_.cancel();
      } else {
        LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
      }
    }

    /**
     * Shut the connection down. 关闭当前FrameBuffer
     */
    public void close() {
      // if we\'re being closed due to an error, we might have allocated a
      // buffer that we need to subtract for our memory accounting.
      if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) {
        readBufferBytesAllocated.addAndGet(-buffer_.array().length);
      }
      trans_.close();
    }

    /**
     * Check if this FrameBuffer has a full frame read.
     */
    public boolean isFrameFullyRead() {
      return state_ == FrameBufferState.READ_FRAME_COMPLETE;
    }

    /**
     * After the processor has processed the invocation, whatever thread is
     * managing invocations should call this method on this FrameBuffer so we
     * know it\'s time to start trying to write again. Also, if it turns out that
     * there actually isn\'t any data in the response buffer, we\'ll skip trying
     * to write and instead go back to reading.
     */
    //准备返回结果
    public void responseReady() {
      // the read buffer is definitely no longer in use, so we will decrement
      // our read buffer count. we do this here as well as in close because
      // we\'d like to free this read memory up as quickly as possible for other
      // clients.
      // 此时已完成调用,释放读缓存
      readBufferBytesAllocated.addAndGet(-buffer_.array().length);

      if (response_.len() == 0) {
        // go straight to reading again. this was probably an oneway method
        // 不需要返回结果,直接将状态置为AWAITING_REGISTER_READ,准备进行下次读取操作
        state_ = FrameBufferState.AWAITING_REGISTER_READ;
        buffer_ = null;
      } else {
        //将返回数据写入buffer_
        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
        // set state that we\'re waiting to be switched to write. we do this
        // asynchronously through requestSelectInterestChange() because there is
        // a possibility that we\'re not in the main thread, and thus currently
        // blocked in select(). (this functionality is in place for the sake of
        // the HsHa server.)
        //状态置为AWAITING_REGISTER_WRITE,准备写回数据
        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
      }
      //请求注册selector事件变化
      requestSelectInterestChange();
    }

    /**
     * Actually invoke the method signified by this FrameBuffer.
     * 调用业务逻辑的方法
     */
    public void invoke() {
      TTransport inTrans =

以上是关于RPC-Thrift的主要内容,如果未能解决你的问题,请参考以下文章

再识RPC-thrift

微信小程序代码片段

VSCode自定义代码片段——CSS选择器

谷歌浏览器调试jsp 引入代码片段,如何调试代码片段中的js

片段和活动之间的核心区别是啥?哪些代码可以写成片段?

VSCode自定义代码片段——.vue文件的模板