Thrift RPC框架实现原理解析
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Thrift RPC框架实现原理解析相关的知识,希望对你有一定的参考价值。
*本文为「码上观世界」原创内容
今日观点:当你是小人物的时候没有看书,当你成大人物后,看书也晚了。大人物没时间看书。你最卑微、最没地位、最不重要、最不被人看中的时候,是你时间最多的时候,你有时间去体会,之后就很难有了。高位是运用积累的,不是进行积累的。当你到达领导的位置时,你说“我来学习学习”,晚了。积累必须在到达高位之前完成,这就是真正的领导者。领导位置是你学习的地方吗?这是你决策的地方。你到这儿开始学习,熟悉情况,然后才尝试决策,你的单位、你的国家、你的民族将付出极其高的代价为你支付学费。
Thrift 服务模型
Thrift提供的网络服务模型:单线程、多线程、事件驱动,从另一个角度划分为:阻塞服务模型、非阻塞服务模型。其中阻塞服务模型有:TSimpleServer、TThreadPoolServer。非阻塞服务模型有:TNonblockingServer、THsHaServer和TThreadedSelectorServer。
TSimpleServer的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个socket连接,效率比较低。它主要用于演示Thrift的工作过程,在实际开发过程中很少用到它。
TThreadPoolServer模式采用阻塞socket方式工作,主线程负责阻塞式监听是否有新socket到来,具体的业务处理交由一个线程池来处理。该模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新的请求会进入队列中排队等待处理。
TNonblockingServer模式也是单线程工作,但是采用NIO的模式,借助Channel/Selector机制, 采用IO事件模型来处理。所有的socket都被注册selector中,在一个线程中通过seletor循环监控所有的socket。每次selector循环结束时,处理所有的处于就绪状态的socket,对于有数据到来的socket进行数据读取操作,对于有数据发送的socket则进行数据发送操作,对于监听socket则产生一个新业务socket并将其注册到selector上。虽然一个 NIO 线程可以同时接收多个 Socket 连接,但在处理任务时仍然是阻塞的。TNonblockingServer要求底层的传输通道必须使用TFramedTransport。
THsHaServer继承于TNonblockingServer,引入了线程池提高了任务处理的并发能力。THsHaServer是半同步半异步(Half-Sync/Half-Async)的处理模式,Half-Aysnc用于IO事件处理(Accept/Read/Write),Half-Sync用于业务handler对rpc的同步处理上。THsHaServer和TNonblockingServer一样,要求底层的传输通道必须使用TFramedTransport。
TThreadedSelectorServer是对THsHaServer的一种扩充,它将selector中的读写IO事件(read/write)从主线程中分离出来。同时引入worker工作线程池,它也是种Half-Sync/Half-Async的服务模型。
TThreadedSelectorServer模式是目前Thrift提供的最高级的线程服务模型,它内部有如果几个部分构成:
一个AcceptThread线程对象,专门用于处理监听socket上的新连接。
若干个SelectorThread对象专门用于处理业务socket的网络I/O读写操作,所有网络数据的读写均是有这些线程来完成。
一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求数据读取之后,交给ExecutorService线程池中的线程完成此次调用的具体执行。主要用于处理每个rpc请求的handler回调处理(这部分是同步的)。
开发示例
编写thrift接口定义文件,如:hello_service.thrift:
namespace java org.learn.hive.thrift.service
service HelloWorldService {
string sayHello(1:string username)
}
然后通过官方提供的接口编译工具thrift执行以下命令,即自动创建框架运行所需要的各种实现类:
thrift-0.14.2.exe -gen java hello_service.thrift
这些类一起完成,接口的定义、消息的发送、处理和接收过程。这些类作为静态类都位于接口定义的名称HelloWorldService之中。这些自动创建的内部类包括同步类和异步类两种,以同步类为例,将创建的类列举如下:
根据接口定义文件创建的类接口:
public interface Iface {
public String sayHello(String username) throws org.apache.thrift.TException;
}
实现类接口Iface的Client,用户客户端发起请求调用:
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
public String sayHello(String username) throws org.apache.thrift.TException{
send_sayHello(username);
return recv_sayHello();
}
public void send_sayHello(String username) throws org.apache.thrift.TException{
sayHello_args args = new sayHello_args();
args.setUsername(username);
sendBase("sayHello", args);
}
public String recv_sayHello() throws org.apache.thrift.TException{
sayHello_result result = new sayHello_result();
receiveBase(result, "sayHello");
if (result.isSetSuccess()) {
return result.success;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result");
}
}
定义处理类,在该类中维护注册实现的方法和相应的ProcessFunction:
public static class Processor<I extends Iface> extends org.apache.thrift.
TBaseProcessor<I> implements org.apache.thrift.TProcessor {
private static <I extends Iface> java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
//sayHello为实现的ProcessFunction子类
processMap.put("sayHello", new sayHello());
return processMap;
}}
除了Iface 、Client和Processor分为同步和异步两种类型的实现,剩下的实现类就是公共支持类:ProcessFunction的子类sayHello、封装方法调用参数处理的sayHello_args和封装方法调用结果的sayHello_result。
应用开发只需要提供Iface的业务处理逻辑的实现类HelloWorldServiceImpl 即可:
public class HelloWorldServiceImpl implements HelloWorldService.Iface{
@Override
public String sayHello(String username) {
return "Hi," + username;
}
}
然后再创建Server和发起客户端应用。比如启动HelloServer示例:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>
(new HelloWorldServiceImpl());
TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
TServer.Args tArgs = new TServer.Args(serverTransport);
tArgs.processor(tprocessor);
tArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new TSimpleServer(tArgs);
System.out.println("HelloWorld TSimpleServer starting ....");
server.serve();
启动客户端示例:
TTransport transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.sayHello(userName);
System.out.println("Thrift org.learn.thrift.client result =: " + result);
原理解析
Thrift调用过程中,Thrift客户端和服务器之间主要用到传输层类、协议层类和处理类三个主要的核心类,这三个类的相互协作共同完成rpc的整个调用过程。在调用过程中将按照以下顺序进行协同工作:
(1)将客户端程序调用的函数名和参数传递给协议层(TProtocol),协议层将函数名和参数按照协议格式进行封装,然后封装的结果交给下层的传输层。此处需要注意:要与Thrift服务器程序所使用的协议类型一样,否则Thrift服务器程序便无法在其协议层进行数据解析;
(2)传输层(TTransport)将协议层传递过来的数据进行处理,例如传输层的实现类TFramedTransport就是将数据封装成帧的形式,即“数据长度+数据内容”,然后将处理之后的数据通过网络发送给Thrift服务器;此处也需要注意:要与Thrift服务器程序所采用的传输层的实现类一致,否则Thrift的传输层也无法将数据进行逆向的处理;
(3)Thrift服务器通过传输层(TTransport)接收网络上传输过来的调用请求数据,然后将接收到的数据进行逆向的处理,例如传输层的实现类TFramedTransport就是将“数据长度+数据内容”形式的网络数据,转成只有数据内容的形式,然后再交付给Thrift服务器的协议类(TProtocol);
(4)Thrift服务端的协议类(TProtocol)将传输层处理之后的数据按照协议进行解封装,并将解封装之后的数据交个Processor类进行处理;
(5)Thrift服务端的Processor类根据协议层(TProtocol)解析的结果,按照函数名找到函数名所对应的函数对象;
(6)Thrift服务端使用传过来的参数调用这个找到的函数对象;
(7)Thrift服务端将函数对象执行的结果交给协议层;
(8)Thrift服务器端的协议层将函数的执行结果进行协议封装;
(9)Thrift服务器端的传输层将协议层封装的结果进行处理,例如封装成帧,然后发送给Thrift客户端程序;
(10)Thrift客户端程序的传输层将收到的网络结果进行逆向处理,得到实际的协议数据;
(11)Thrift客户端的协议层将数据按照协议格式进行解封装,然后得到具体的函数执行结果,并将其交付给调用函数;
以方法调用String getStr(String str1,Strin str2)为例描述上述过程如图所示:
下面参照源代码来详细介绍。服务器示例代码中使用的Thrift框架本身的类包括TProcessor 、TServerSocket 、TBinaryProtocol、Args 和TSimpleServer 。
其中TSimpleServer提供基本的事件监听和处理的简单服务功能,对应上文提到的第一种单线程服务模型:
public class TSimpleServer extends TServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TSimpleServer.class.getName());
public TSimpleServer(AbstractServerArgs args) {
super(args);
}
public void serve() {
try {
this.serverTransport_.listen();
} catch (TTransportException var9) {
LOGGER.error("Error occurred during listening.", var9);
return;
}
if (this.eventHandler_ != null) {
this.eventHandler_.preServe();
}
this.setServing(true);
while(!this.stopped_) {
TTransport client = null;
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
ServerContext connectionContext = null;
client = this.serverTransport_.accept();
if (client != null) {
processor = this.processorFactory_.getProcessor(client);
inputTransport = this.inputTransportFactory_.getTransport(client);
outputTransport = this.outputTransportFactory_.getTransport(client);
inputProtocol = this.inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = this.outputProtocolFactory_.getProtocol(outputTransport);
if (this.eventHandler_ != null) {
connectionContext = this.eventHandler_.createContext(inputProtocol, outputProtocol);
}
do {
if (this.eventHandler_ != null) {
this.eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
} while(processor.process(inputProtocol, outputProtocol));
}
if (this.eventHandler_ != null) {
this.eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
}
if (inputTransport != null) {
inputTransport.close();
}
if (outputTransport != null) {
outputTransport.close();
}
}
this.setServing(false);
}
public void stop() {
this.stopped_ = true;
this.serverTransport_.interrupt();
}
}
TSimpleServer通过事件循环处理客户端请求,内部包括处理器、输入输出Transport和输入输出Protocol,它们通过数据结构Args来提供初始化,Args通过建造者模式构造实例:
public abstract static class AbstractServerArgs<T extends TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new Factory();
TProtocolFactory outputProtocolFactory = new Factory();
public AbstractServerArgs(TServerTransport transport) {
this.serverTransport = transport;
}
public T processorFactory(TProcessorFactory factory) {
this.processorFactory = factory;
return this;
}
public T processor(TProcessor processor) {
this.processorFactory = new TProcessorFactory(processor);
return this;
}
public T transportFactory(TTransportFactory factory) {
this.inputTransportFactory = factory;
this.outputTransportFactory = factory;
return this;
}
public T inputTransportFactory(TTransportFactory factory) {
this.inputTransportFactory = factory;
return this;
}
public T outputTransportFactory(TTransportFactory factory) {
this.outputTransportFactory = factory;
return this;
}
public T protocolFactory(TProtocolFactory factory) {
this.inputProtocolFactory = factory;
this.outputProtocolFactory = factory;
return this;
}
public T inputProtocolFactory(TProtocolFactory factory) {
this.inputProtocolFactory = factory;
return this;
}
public T outputProtocolFactory(TProtocolFactory factory) {
this.outputProtocolFactory = factory;
return this;
}
}
public static class Args extends TServer.AbstractServerArgs<TServer.Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
}
TSimpleServer中比较重要的TProcessor 来源于对tArgs的设置:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>
(new HelloWorldServiceImpl());
TServer.Args tArgs = new TServer.Args(serverTransport);
tArgs.processor(tprocessor);
其中Processor继承TBaseProcessor:
public abstract class TBaseProcessor<I> implements TProcessor {
public boolean process(TProtocol in, TProtocol out) throws TException {
TMessage msg = in.readMessageBegin();
ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name);
fn.process(msg.seqid, in, out, this.iface);
return true;
}
}
processMap维护了注册的方法名对应的ProcessFunction,ProcessFunction作为抽象类,提供了基本的参数读取、方法调用和结果响应,参见其process方法实现:
public abstract class ProcessFunction<I, T extends TBase> {
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
TBase args = this.getEmptyArgsInstance();
args.read(iprot);
iprot.readMessageEnd();
TBase result = null;
result = this.getResult(iface, args);
if (!this.isOneway()) {
oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)2, seqid));
result.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
}
}
}
ProcessFunction的其他方法是随着不同的方法实现不同的逻辑,比如sayHello的实现:
public static class sayHello<I extends Iface> extends org.apache.thrift.
ProcessFunction<I, sayHello_args> {
public sayHello() {
super("sayHello");
}
public sayHello_args getEmptyArgsInstance() {
return new sayHello_args();
}
protected boolean isOneway() {
return false;
}
protected boolean rethrowUnhandledExceptions() {
return false;
}
public sayHello_result getResult(I iface, sayHello_args args) throws org.apache.thrift.TException {
sayHello_result result = new sayHello_result();
result.success = iface.sayHello(args.username);
return result;
}
}
在getResult中调用用户提供的实现类HelloWorldServiceImpl。
通讯协议
Thrift可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本(text)和二进制(binary)传输协议。为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这取决于实际需求。常用协议有以下几种:
TBinaryProtocol:二进制编码格式进行数据传输
TCompactProtocol:高效率的、密集的二进制编码格式进行数据传输
TJSONProtocol:使用JSON文本的数据编码协议进行数据传输
TSimpleJSONProtocol:只提供JSON只写的协议,适用于通过脚本语言解析
我们的示例中使用TBinaryProtocol协议, 类TBinaryProtocol是TProtocol的一个实现类,TBinaryProtocol协议规定采用这种协议格式的进行消息传输时,需要为消息内容封装一个首部,TBinaryProtocol协议的首部有两种操作方式:一种是严格读写模式,一种值普通的读写模式;这两种模式下消息首部的组织方式不一样,在创建时也可以自己指定使用那种模式,但是要注意,如果要指定模式,Thrift的服务器端和客户端都需要指定。
严格读写模型下的消息首部的前16字节固定为版本号:0x8001,如图所示:
在严格读写模式下,首部中前32字节包括固定的16字节协议版本号0x8001,8字节的0x00,8字节的消息类型;然后是若干字节字符串格式的消息名称,消息名称的组织方式也是“长度+内容”的方式;再下来是32位的消息序列号;在序列号之后的才是消息内容。
普通读写模式下,没有版本信息,首部的前32字节就是消息的名称,然后是消息的名字,接下来是32为的消息序列号,最后是消息的内容。
通信过程中消息的首部在TBinaryProtocol类中进行通过readMessageBegin读取,通过writeMessageBegin写入;但是消息的内容读取在返回值封装类中进行;
下面按照数据读取流程来看看其实现:
#TBaseProcessor.java
public abstract class TBaseProcessor<I> implements TProcessor {
public boolean process(TProtocol in, TProtocol out) throws TException {
TMessage msg = in.readMessageBegin();
ProcessFunction fn = (ProcessFunction)this.processMap.get(msg.name);
fn.process(msg.seqid, in, out, this.iface);
return true;
}
}
readMessageBegin详细过程如下:
[1]首先从传输过来的网络数据中读取32位数据,然后根据首位是否为1来判断当前读到的消息是严格读写模式还是普通读写模式;如果是严格读写模式则这32位数据包括版本号和消息类型,否则这32位保存的是后面的消息名称
[2]读取消息的名称,如果是严格读写模式,则消息名称为字符串格式,保存方式为“长度+内容”的方式,如果是普通读写模式,则消息名称的长度直接根据[1]中读取的长度来读取消息名称;
[3]读取消息类型,如果是严格读写模式,则消息类型已经由[1]中读取出来了,在其32位数据中的低8位中保存着;如果是普通读写模式,则要继续读取一字节的消息类型;
[4]读取32为的消息序列号;
读取数据的过程是在函数返回值的封装类中来完成,根据读取的数值的类型来具体读取数据的内容;在TBinaryProtocol协议中readMessageEnd函数为空。
readMessageBegin执行之后,得到的TMessage内容如下图:
TMessage的type定义在TMessageType中:
public final class TMessageType {
public static final byte CALL = 1;
public static final byte REPLY = 2;
public static final byte EXCEPTION = 3;
public static final byte ONEWAY = 4;
}
调用序号使用在客户端存根类中(实际上是在基类TServiceClient)中保存的一个序列号,每次调用时均使用此序列号,使用完再将序号加1。
private static class sayHello_argsStandardScheme extends org.apache.thrift.scheme
.StandardScheme<sayHello_args> {
public void read(org.apache.thrift.protocol.TProtocol iprot, sayHello_args struct)
throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // USERNAME
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.username = iprot.readString();
struct.setUsernameIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
schemeField =iprot.readFieldBegin() 执行之后,得到的schemeField内容如下图:
字段包括type和field-id。type是字段的数据类型的标识号,field-id是Thrift IDL定义的字段次序,比如定义的username参数次序1。Thrift提供了TType,对不同的数据类型(type)提供了唯一标识的type:
public final class TType {
public static final byte STOP = 0;
public static final byte VOID = 1;
public static final byte BOOL = 2;
public static final byte BYTE = 3;
public static final byte DOUBLE = 4;
public static final byte I16 = 6;
public static final byte I32 = 8;
public static final byte I64 = 10;
public static final byte STRING = 11;
public static final byte STRUCT = 12;
public static final byte MAP = 13;
public static final byte SET = 14;
public static final byte LIST = 15;
public static final byte ENUM = 16;
public TType() {
}
}
写消息过程包含以下内容:
先writeMessageBegin表示开始传输消息了,写消息头。Message里面定义了方法名,调用的类型,版本号,消息seqId
接下来是写方法的参数,实际就是写消息体。如果参数是一个类,就writeStructBegin
接下来写字段,writeFieldBegin, 这个方法会写接下来的字段的数据类型和顺序号。这个顺序号是Thrfit对要传输的字段的一个编码,从1开始
如果是一个集合就writeListBegin/writeMapBegin,如果是一个基本数据类型,比如int, 就直接writeI32
每个复杂数据类型写完都调用writeXXXEnd,直到writeMessageEnd结束
#ProcessFunction.java
oprot.writeMessageBegin(new TMessage(this.getMethodName(), (byte)2, seqid));
result.write(oprot);
oprot.writeMessageEnd();
writeMessageBegin函数需要一个参数TMessage作为消息的首部,写入过程与读取过程类似,首先判断需要执行严格读写模式还是普通读写模式,然后分别按照读写模式的具体消息将消息首部写入TBinaryProtocol的TTransport成员的buf中。writeStruct和写writeField参考下面的过程:
private static class sayHello_resultStandardScheme extends org.apache.thrift.scheme.
StandardScheme<sayHello_result> {
public void write(org.apache.thrift.protocol.TProtocol oprot, sayHello_result struct)
throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.success != null) {
oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
oprot.writeString(struct.success);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
以上是关于Thrift RPC框架实现原理解析的主要内容,如果未能解决你的问题,请参考以下文章