Thrift RPC框架实现原理解析

Posted 咬定青松

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Thrift RPC框架实现原理解析相关的知识,希望对你有一定的参考价值。

*本文为「码上观世界」原创内容 

今日观点:当你是小人物的时候没有看书,当你成大人物后,看书也晚了。大人物没时间看书。你最卑微、最没地位、最不重要、最不被人看中的时候,是你时间最多的时候,你有时间去体会,之后就很难有了。高位是运用积累的,不是进行积累的。当你到达领导的位置时,你说“我来学习学习”,晚了。积累必须在到达高位之前完成,这就是真正的领导者。领导位置是你学习的地方吗?这是你决策的地方。你到这儿开始学习,熟悉情况,然后才尝试决策,你的单位、你的国家、你的民族将付出极其高的代价为你支付学费。

Thrift 服务模型

Thrift提供的网络服务模型:单线程、多线程、事件驱动,从另一个角度划分为:阻塞服务模型、非阻塞服务模型。其中阻塞服务模型有:TSimpleServer、TThreadPoolServer。非阻塞服务模型有:TNonblockingServer、THsHaServer和TThreadedSelectorServer。

TSimpleServer的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个socket连接,效率比较低。它主要用于演示Thrift的工作过程,在实际开发过程中很少用到它。

TThreadPoolServer模式采用阻塞socket方式工作,主线程负责阻塞式监听是否有新socket到来,具体的业务处理交由一个线程池来处理。该模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新的请求会进入队列中排队等待处理。

TNonblockingServer模式也是单线程工作,但是采用NIO的模式,借助Channel/Selector机制, 采用IO事件模型来处理。所有的socket都被注册selector中,在一个线程中通过seletor循环监控所有的socket。每次selector循环结束时,处理所有的处于就绪状态的socket,对于有数据到来的socket进行数据读取操作,对于有数据发送的socket则进行数据发送操作,对于监听socket则产生一个新业务socket并将其注册到selector上。虽然一个 NIO 线程可以同时接收多个 Socket 连接,但在处理任务时仍然是阻塞的。TNonblockingServer要求底层的传输通道必须使用TFramedTransport。

THsHaServer继承于TNonblockingServer,引入了线程池提高了任务处理的并发能力。THsHaServer是半同步半异步(Half-Sync/Half-Async)的处理模式,Half-Aysnc用于IO事件处理(Accept/Read/Write),Half-Sync用于业务handler对rpc的同步处理上。THsHaServer和TNonblockingServer一样,要求底层的传输通道必须使用TFramedTransport。

TThreadedSelectorServer是对THsHaServer的一种扩充,它将selector中的读写IO事件(read/write)从主线程中分离出来。同时引入worker工作线程池,它也是种Half-Sync/Half-Async的服务模型。

TThreadedSelectorServer模式是目前Thrift提供的最高级的线程服务模型,它内部有如果几个部分构成:

  • 一个AcceptThread线程对象,专门用于处理监听socket上的新连接。

  • 若干个SelectorThread对象专门用于处理业务socket的网络I/O读写操作,所有网络数据的读写均是有这些线程来完成。

  • 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。

  • 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求数据读取之后,交给ExecutorService线程池中的线程完成此次调用的具体执行。主要用于处理每个rpc请求的handler回调处理(这部分是同步的)。

开发示例

编写thrift接口定义文件,如:hello_service.thrift:

namespace java org.learn.hive.thrift.service

service HelloWorldService {
  string sayHello(1:string username)
}

然后通过官方提供的接口编译工具thrift执行以下命令,即自动创建框架运行所需要的各种实现类:

thrift-0.14.2.exe -gen java hello_service.thrift

这些类一起完成,接口的定义、消息的发送、处理和接收过程。这些类作为静态类都位于接口定义的名称HelloWorldService之中。这些自动创建的内部类包括同步类和异步类两种,以同步类为例,将创建的类列举如下:

根据接口定义文件创建的类接口:

public interface Iface {

  public String sayHello(String username) throws org.apache.thrift.TException;
}

实现类接口Iface的Client,用户客户端发起请求调用:

public static class Client extends org.apache.thrift.TServiceClient implements Iface {
 public String sayHello(String username) throws org.apache.thrift.TException{
    send_sayHello(username);
    return recv_sayHello();
  }

  public void send_sayHello(String username) throws org.apache.thrift.TException{
    sayHello_args args = new sayHello_args();
    args.setUsername(username);
    sendBase("sayHello", args);
  }

  public String recv_sayHello() throws org.apache.thrift.TException{
    sayHello_result result = new sayHello_result();
    receiveBase(result, "sayHello");
    if (result.isSetSuccess()) {
      return result.success;
    }
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result");
  }
}

定义处理类,在该类中维护注册实现的方法和相应的ProcessFunction:

public static class Processor<I extends Iface> extends org.apache.thrift.

TBaseProcessor<I> implements org.apache.thrift.TProcessor {

  private static <I extends Iface> java.util.Map<String,  org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
//sayHello为实现的ProcessFunction子类
processMap.put("sayHello", new sayHello());

    return processMap;
  }}

除了Iface 、Client和Processor分为同步和异步两种类型的实现,剩下的实现类就是公共支持类:ProcessFunction的子类sayHello、封装方法调用参数处理的sayHello_args和封装方法调用结果的sayHello_result。

应用开发只需要提供Iface的业务处理逻辑的实现类HelloWorldServiceImpl 即可:

public class HelloWorldServiceImpl implements HelloWorldService.Iface{

    @Override
    public String sayHello(String username) {
        return "Hi," + username;
    }
}

然后再创建Server和发起客户端应用。比如启动HelloServer示例:

TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>
(new HelloWorldServiceImpl());

TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
TServer.Args tArgs = new TServer.Args(serverTransport);
tArgs.processor(tprocessor);
tArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new TSimpleServer(tArgs);
System.out.println("HelloWorld TSimpleServer starting ....");
server.serve();

启动客户端示例:

TTransport transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.sayHello(userName);
System.out.println("Thrift org.learn.thrift.client result =: " + result);

原理解析

 Thrift调用过程中,Thrift客户端和服务器之间主要用到传输层类、协议层类和处理类三个主要的核心类,这三个类的相互协作共同完成rpc的整个调用过程。在调用过程中将按照以下顺序进行协同工作:

(1)将客户端程序调用的函数名和参数传递给协议层(TProtocol),协议层将函数名和参数按照协议格式进行封装,然后封装的结果交给下层的传输层。此处需要注意:要与Thrift服务器程序所使用的协议类型一样,否则Thrift服务器程序便无法在其协议层进行数据解析;

(2)传输层(TTransport)将协议层传递过来的数据进行处理,例如传输层的实现类TFramedTransport就是将数据封装成帧的形式,即“数据长度+数据内容”,然后将处理之后的数据通过网络发送给Thrift服务器;此处也需要注意:要与Thrift服务器程序所采用的传输层的实现类一致,否则Thrift的传输层也无法将数据进行逆向的处理;

(3)Thrift服务器通过传输层(TTransport)接收网络上传输过来的调用请求数据,然后将接收到的数据进行逆向的处理,例如传输层的实现类TFramedTransport就是将“数据长度+数据内容”形式的网络数据,转成只有数据内容的形式,然后再交付给Thrift服务器的协议类(TProtocol);

(4)Thrift服务端的协议类(TProtocol)将传输层处理之后的数据按照协议进行解封装,并将解封装之后的数据交个Processor类进行处理;

(5)Thrift服务端的Processor类根据协议层(TProtocol)解析的结果,按照函数名找到函数名所对应的函数对象;

(6)Thrift服务端使用传过来的参数调用这个找到的函数对象;

(7)Thrift服务端将函数对象执行的结果交给协议层;

(8)Thrift服务器端的协议层将函数的执行结果进行协议封装;

(9)Thrift服务器端的传输层将协议层封装的结果进行处理,例如封装成帧,然后发送给Thrift客户端程序;

(10)Thrift客户端程序的传输层将收到的网络结果进行逆向处理,得到实际的协议数据;

(11)Thrift客户端的协议层将数据按照协议格式进行解封装,然后得到具体的函数执行结果,并将其交付给调用函数;

以方法调用String getStr(String str1,Strin str2)为例描述上述过程如图所示:

下面参照源代码来详细介绍。服务器示例代码中使用的Thrift框架本身的类包括TProcessor 、TServerSocket 、TBinaryProtocol、Args 和TSimpleServer 。

其中TSimpleServer提供基本的事件监听和处理的简单服务功能,对应上文提到的第一种单线程服务模型:

public class TSimpleServer extends TServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TSimpleServer.class.getName());
    public TSimpleServer(AbstractServerArgs args) {
        super(args);
    }

    public void serve() {
        try {
            this.serverTransport_.listen();
        } catch (TTransportException var9) {
            LOGGER.error("Error occurred during listening.", var9);
            return;
        }

        if (this.eventHandler_ != null) {
            this.eventHandler_.preServe();
        }

        this.setServing(true);
        while(!this.stopped_) {
            TTransport client = null;
            TProcessor processor = null;
            TTransport inputTransport = null;
            TTransport outputTransport = null;
            TProtocol inputProtocol = null;
            TProtocol outputProtocol = null;
            ServerContext connectionContext = null;
            client = this.serverTransport_.accept();
            if (client != null) {
                processor = this.processorFactory_.getProcessor(client);
                inputTransport = this.inputTransportFactory_.getTransport(client);
                outputTransport = this.outputTransportFactory_.getTransport(client);
                inputProtocol = this.inputProtocolFactory_.getProtocol(inputTransport);
                outputProtocol = this.outputProtocolFactory_.getProtocol(outputTransport);
                if (this.eventHandler_ != null) {
                    connectionContext = this.eventHandler_.createContext(inputProtocol, outputProtocol);
                }
                do {
                    if (this.eventHandler_ != null) {
                        this.eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
                } while(processor.process(inputProtocol, outputProtocol));
            }

            if (this.eventHandler_ != null) {
                this.eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
            }

            if (inputTransport != null) {
                inputTransport.close();
            }

            if (outputTransport != null) {
                outputTransport.close();
            }
        }
        this.setServing(false);
    }

    public void stop() {
        this.stopped_ = true;
        this.serverTransport_.interrupt();
    }
}

TSimpleServer通过事件循环处理客户端请求,内部包括处理器、输入输出Transport和输入输出Protocol,它们通过数据结构Args来提供初始化,Args通过建造者模式构造实例:

public abstract static class AbstractServerArgs<T extends TServer.AbstractServerArgs<T>> {
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new Factory();
        TProtocolFactory outputProtocolFactory = new Factory();
        public AbstractServerArgs(TServerTransport transport) {
            this.serverTransport = transport;
        }

        public T processorFactory(TProcessorFactory factory) {
            this.processorFactory = factory;
            return this;
        }

        public T processor(TProcessor processor) {
            this.processorFactory = new TProcessorFactory(processor);
            return this;
        }

        public T transportFactory(TTransportFactory factory) {
            this.inputTransportFactory = factory;
            this.outputTransportFactory = factory;
            return this;
        }

        public T inputTransportFactory(TTransportFactory factory) {
            this.inputTransportFactory = factory;
            return this;
        }

        public T outputTransportFactory(TTransportFactory factory) {
            this.outputTransportFactory = factory;
            return this;
        }

        public T protocolFactory(TProtocolFactory factory) {
            this.inputProtocolFactory = factory;
            this.outputProtocolFactory = factory;
            return this;
        }

        public T inputProtocolFactory(TProtocolFactory factory) {
            this.inputProtocolFactory = factory;
            return this;
        }

        public T outputProtocolFactory(TProtocolFactory factory) {
            this.outputProtocolFactory = factory;
            return this;
        }
    }

    public static class Args extends TServer.AbstractServerArgs<TServer.Args> {
        public Args(TServerTransport transport) {
            super(transport);
        }
    }
}

TSimpleServer中比较重要的TProcessor 来源于对tArgs的设置:

TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>
(new HelloWorldServiceImpl());

TServer.Args tArgs = new TServer.Args(serverTransport);
tArgs.processor(tprocessor);

其中Processor继承TBaseProcessor:

public abstract class TBaseProcessor<I> implements TProcessor {

    public boolean process(TProtocol in, TProtocol out) throws TException {
        TMessage msg = in.readMessageBegin();
        ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name);
        fn.process(msg.seqid, in, out, this.iface);
        return true;
    }
}

processMap维护了注册的方法名对应的ProcessFunction,ProcessFunction作为抽象类,提供了基本的参数读取、方法调用和结果响应,参见其process方法实现:

public abstract class ProcessFunction<I, T extends TBase> {

    public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
    TBase args = this.getEmptyArgsInstance();
    args.read(iprot);    
    iprot.readMessageEnd();
    TBase result = null;
    result = this.getResult(iface, args);
      if (!this.isOneway()) {
             oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)2, seqid));
           result.write(oprot);
           oprot.writeMessageEnd();
         oprot.getTransport().flush();

    }

    }
}        

ProcessFunction的其他方法是随着不同的方法实现不同的逻辑,比如sayHello的实现:

 

public static class sayHello<I extends Iface> extends org.apache.thrift.
ProcessFunction<I, sayHello_args> {
public sayHello() {
    super("sayHello");
  }

  public sayHello_args getEmptyArgsInstance() {
    return new sayHello_args();
  }

  protected boolean isOneway() {
    return false;
  }

  protected boolean rethrowUnhandledExceptions() {
    return false;
  }

  public sayHello_result getResult(I iface, sayHello_args args) throws org.apache.thrift.TException {
    sayHello_result result = new sayHello_result();
    result.success = iface.sayHello(args.username);
    return result;
  }
}

在getResult中调用用户提供的实现类HelloWorldServiceImpl。

通讯协议

Thrift可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本(text)和二进制(binary)传输协议。为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这取决于实际需求。常用协议有以下几种:

TBinaryProtocol:二进制编码格式进行数据传输

TCompactProtocol:高效率的、密集的二进制编码格式进行数据传输

TJSONProtocol:使用JSON文本的数据编码协议进行数据传输

TSimpleJSONProtocol:只提供JSON只写的协议,适用于通过脚本语言解析

我们的示例中使用TBinaryProtocol协议, 类TBinaryProtocol是TProtocol的一个实现类,TBinaryProtocol协议规定采用这种协议格式的进行消息传输时,需要为消息内容封装一个首部,TBinaryProtocol协议的首部有两种操作方式:一种是严格读写模式,一种值普通的读写模式;这两种模式下消息首部的组织方式不一样,在创建时也可以自己指定使用那种模式,但是要注意,如果要指定模式,Thrift的服务器端和客户端都需要指定。

严格读写模型下的消息首部的前16字节固定为版本号:0x8001,如图所示:

在严格读写模式下,首部中前32字节包括固定的16字节协议版本号0x8001,8字节的0x00,8字节的消息类型;然后是若干字节字符串格式的消息名称,消息名称的组织方式也是“长度+内容”的方式;再下来是32位的消息序列号;在序列号之后的才是消息内容。

普通读写模式下,没有版本信息,首部的前32字节就是消息的名称,然后是消息的名字,接下来是32为的消息序列号,最后是消息的内容。

通信过程中消息的首部在TBinaryProtocol类中进行通过readMessageBegin读取,通过writeMessageBegin写入;但是消息的内容读取在返回值封装类中进行;

下面按照数据读取流程来看看其实现:

#TBaseProcessor.java

public abstract class TBaseProcessor<I> implements TProcessor {

    public boolean process(TProtocol in, TProtocol out) throws TException {
        TMessage msg = in.readMessageBegin();
        ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name);
        fn.process(msg.seqid, in, out, this.iface);
        return true;
    }
}

readMessageBegin详细过程如下:

[1]首先从传输过来的网络数据中读取32位数据,然后根据首位是否为1来判断当前读到的消息是严格读写模式还是普通读写模式;如果是严格读写模式则这32位数据包括版本号和消息类型,否则这32位保存的是后面的消息名称

[2]读取消息的名称,如果是严格读写模式,则消息名称为字符串格式,保存方式为“长度+内容”的方式,如果是普通读写模式,则消息名称的长度直接根据[1]中读取的长度来读取消息名称;

[3]读取消息类型,如果是严格读写模式,则消息类型已经由[1]中读取出来了,在其32位数据中的低8位中保存着;如果是普通读写模式,则要继续读取一字节的消息类型;

[4]读取32为的消息序列号;

读取数据的过程是在函数返回值的封装类中来完成,根据读取的数值的类型来具体读取数据的内容;在TBinaryProtocol协议中readMessageEnd函数为空。

readMessageBegin执行之后,得到的TMessage内容如下图:

TMessage的type定义在TMessageType中:

public final class TMessageType {

   public static final byte CALL = 1;

   public static final byte REPLY = 2;

   public static final byte EXCEPTION = 3;

   public static final byte ONEWAY = 4;

}

调用序号使用在客户端存根类中(实际上是在基类TServiceClient)中保存的一个序列号,每次调用时均使用此序列号,使用完再将序号加1。

private static class sayHello_argsStandardScheme extends org.apache.thrift.scheme
.StandardScheme<sayHello_args> {

public void read(org.apache.thrift.protocol.TProtocol iprot, sayHello_args struct)
throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
    iprot.readStructBegin();
    while (true)
    {
      schemeField = iprot.readFieldBegin();
      if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
        break;
      }
      switch (schemeField.id) {
        case 1: // USERNAME
          if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
            struct.username = iprot.readString();
            struct.setUsernameIsSet(true);
          } else { 
            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
          }
          break;
        default:
          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
      }
      iprot.readFieldEnd();
    }
    iprot.readStructEnd();
    // check for required fields of primitive type, which can't be checked in the validate method
    struct.validate();
  }

schemeField =iprot.readFieldBegin() 执行之后,得到的schemeField内容如下图:

字段包括type和field-id。type是字段的数据类型的标识号,field-id是Thrift IDL定义的字段次序,比如定义的username参数次序1。Thrift提供了TType,对不同的数据类型(type)提供了唯一标识的type:

public final class TType {
    public static final byte STOP = 0;
    public static final byte VOID = 1;
    public static final byte BOOL = 2;
    public static final byte BYTE = 3;
    public static final byte DOUBLE = 4;
    public static final byte I16 = 6;
    public static final byte I32 = 8;
    public static final byte I64 = 10;
    public static final byte STRING = 11;
    public static final byte STRUCT = 12;
    public static final byte MAP = 13;
    public static final byte SET = 14;
    public static final byte LIST = 15;
    public static final byte ENUM = 16;
    public TType() {
    }
}

写消息过程包含以下内容:

  • 先writeMessageBegin表示开始传输消息了,写消息头。Message里面定义了方法名,调用的类型,版本号,消息seqId

  • 接下来是写方法的参数,实际就是写消息体。如果参数是一个类,就writeStructBegin

  • 接下来写字段,writeFieldBegin, 这个方法会写接下来的字段的数据类型和顺序号。这个顺序号是Thrfit对要传输的字段的一个编码,从1开始

  • 如果是一个集合就writeListBegin/writeMapBegin,如果是一个基本数据类型,比如int, 就直接writeI32

  • 每个复杂数据类型写完都调用writeXXXEnd,直到writeMessageEnd结束

#ProcessFunction.java

oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)2, seqid));
result.write(oprot);
oprot.writeMessageEnd();

writeMessageBegin函数需要一个参数TMessage作为消息的首部,写入过程与读取过程类似,首先判断需要执行严格读写模式还是普通读写模式,然后分别按照读写模式的具体消息将消息首部写入TBinaryProtocol的TTransport成员的buf中。writeStruct和写writeField参考下面的过程:

private static class sayHello_resultStandardScheme extends org.apache.thrift.scheme.
StandardScheme<sayHello_result> {
  public void write(org.apache.thrift.protocol.TProtocol oprot, sayHello_result struct)
throws org.apache.thrift.TException {
struct.validate();
    oprot.writeStructBegin(STRUCT_DESC);
    if (struct.success != null) {
      oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
      oprot.writeString(struct.success);
      oprot.writeFieldEnd();
    }
    oprot.writeFieldStop();
    oprot.writeStructEnd();
  }
}

以上是关于Thrift RPC框架实现原理解析的主要内容,如果未能解决你的问题,请参考以下文章

Apache thrift - 使用,内部实现及构建一个可扩展的RPC框架

thrift原理分析

RPC远程协议之Thrift入门

非死不可的RPC框架:thrift

VIP_OSP--基于Thrift的RPC框架的基本原理

VIP_OSP--基于Thrift的RPC框架的基本原理