再识RPC-thrift

Posted 码农戏码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了再识RPC-thrift相关的知识,希望对你有一定的参考价值。

RPC

原理

什么是Stub?

Stub是一段代码,用来转换RPC过程中传递的参数。处理内容包括不同OS之间的大小端问题。另外,Client端一般叫Stub,Server端一般叫Skeleton。

生产方式:

  1. 手动生成,比较麻烦;

  2. 自动生成,使用IDL(InterfaceDescriptionLanguate),定义C/S的接口

RPC的套路:

自古深情留不住 唯有套路留人心

RPC最本质的就是通过socket把方法信息传输到远程服务器并执行相应method

在java界的rpc框架的实现手法:

  • 服务端:socket + 反射

  • 客户端:动态代理 + socket

之前也解析过motain框架,、《motain服务端分析》

thrift

由于我司框架是通过thrift改造,发现这个框架没有按java套路出牌,可能这是跨语言类RPC的套路,有必要了解一下

thrift最初由facebook开发用做系统内各语言之间的RPC通信 。2007年由facebook贡献到apache基金 ,08年5月进入apache孵化器,支持多种语言之间的RPC方式的通信:php语言client可以构造一个对象,调用相应的服务方法来调用java语言的服务 ,跨越语言的C/S RPC调用   

示例

IDL文件

 
   
   
 
  1. //HelloService.thrfit

  2. namespace java com.jack.thrift

  3. service HelloService{

  4.    string helloString(1:string what)

  5. }

生成代码

 
   
   
 
  1. 运行  thrift -gen HelloService.thrfit

会生成一个HelloService类

实现服务端与客服端

让服务端打印出客户端传入的参数

服务端

 
   
   
 
  1. public class ThriftServer {

  2.    /**

  3.     * 启动thrift服务器

  4.     * @param args

  5.     */

  6.    public static void main(String[] args) throws Exception {

  7.        try {

  8.            System.out.println("服务端开启....");

  9.            TProcessor tprocessor = new HelloService.Processor<HelloService.Iface>(new HelloServiceImpl());

  10.            // 简单的单线程服务模型

  11.            TServerSocket serverTransport = new TServerSocket(9898);

  12.            TServer.Args tArgs = new TServer.Args(serverTransport);

  13.            tArgs.processor(tprocessor);

  14.            tArgs.protocolFactory(new TBinaryProtocol.Factory());

  15.            TServer server = new TSimpleServer(tArgs);

  16.            server.serve();

  17.        }catch (Exception e) {

  18.            e.printStackTrace();

  19.        }

  20.    }

  21. }

客户端

 
   
   
 
  1. public class ThriftClient {

  2.    public static void main(String[] args) {

  3.        System.out.println("客户端启动....");

  4.        TTransport transport = null;

  5.        try {

  6.            transport = new TSocket("localhost", 9898, 30000);

  7.            // 协议要和服务端一致

  8.            TProtocol protocol = new TBinaryProtocol(transport);

  9.            HelloService.Client client = new HelloService.Client(protocol);

  10.            transport.open();

  11.            String result = client.helloString("哈哈");

  12.            System.out.println(result);

  13.        } catch (TTransportException e) {

  14.            e.printStackTrace();

  15.        } catch (TException e) {

  16.            e.printStackTrace();

  17.        } finally {

  18.            if (null != transport) {

  19.                transport.close();

  20.            }

  21.        }

  22.    }

  23. }

解析

可以看出server,client代码相对很简单,主要看看生成的HelloService类,这个类就是stub代码

来看一下,这个类是如何封装,把method和args传输到远程的

client

 
   
   
 
  1. HelloService.Client client = new HelloService.Client(protocol);

  2. String result = client.helloString("哈哈");

关键点在HelloService.Client.helloString()方法

 
   
   
 
  1. public String helloString(String what) throws org.apache.thrift.TException

  2.    {

  3.      send_helloString(what);

  4.      return recv_helloString();

  5.    }

发送消息

 
   
   
 
  1. public void send_helloString(String what) throws org.apache.thrift.TException

  2.    {

  3.      helloString_args args = new helloString_args();

  4.      args.setWhat(what);

  5.      sendBase("helloString", args);

  6.    }

  1. 把args抽象成了一个类

  2. 属性赋值

  3. 发送

主要看下sendBase()方法

 
   
   
 
  1. private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {

  2.    oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));

  3.    args.write(oprot_);

  4.    oprot_.writeMessageEnd();

  5.    oprot_.getTransport().flush();

  6.  }

  • 1.oprot_.writeMessageBegin 根据Protocol写数据,比如这儿使用的TBinaryProtocol,以二进制写数据

 
   
   
 
  1. public void writeMessageBegin(TMessage message) throws TException {

  2.    if (strictWrite_) {

  3.      int version = VERSION_1 | message.type;

  4.      writeI32(version);

  5.      writeString(message.name);

  6.      writeI32(message.seqid);

  7.    } else {

  8.      writeString(message.name);

  9.      writeByte(message.type);

  10.      writeI32(message.seqid);

  11.    }

  12.  }

再深入看看怎么写二进制数据的

int类型

 
   
   
 
  1. public void writeI32(int i32) throws TException {

  2.    inoutTemp[0] = (byte)(0xff & (i32 >> 24));

  3.    inoutTemp[1] = (byte)(0xff & (i32 >> 16));

  4.    inoutTemp[2] = (byte)(0xff & (i32 >> 8));

  5.    inoutTemp[3] = (byte)(0xff & (i32));

  6.    trans_.write(inoutTemp, 0, 4);

  7.  }

string类型,先写长度,再写bytes

 
   
   
 
  1. public void writeString(String str) throws TException {

  2.    try {

  3.      byte[] dat = str.getBytes("UTF-8");

  4.      writeI32(dat.length);

  5.      trans_.write(dat, 0, dat.length);

  6.    } catch (UnsupportedEncodingException uex) {

  7.      throw new TException("JVM DOES NOT SUPPORT UTF-8");

  8.    }

  9.  }

这儿写最终还是使用Transport.write,比如这儿使用的TSocket

 
   
   
 
  1. public void write(byte[] buf, int off, int len) throws TTransportException {

  2.    if (outputStream_ == null) {

  3.      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");

  4.    }

  5.    try {

  6.      outputStream_.write(buf, off, len);

  7.    } catch (IOException iox) {

  8.      throw new TTransportException(TTransportException.UNKNOWN, iox);

  9.    }

  10.  }

就是写到

 
   
   
 
  1. outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);

  • 2.args.write(oprot_);

 
   
   
 
  1. public void write(org.apache.thrift.protocol.TProtocol oprot, helloString_args struct) throws org.apache.thrift.TException {

  2.        struct.validate();

  3.        oprot.writeStructBegin(STRUCT_DESC);

  4.        if (struct.what != null) {

  5.          oprot.writeFieldBegin(WHAT_FIELD_DESC);

  6.          oprot.writeString(struct.what);

  7.          oprot.writeFieldEnd();

  8.        }

  9.        oprot.writeFieldStop();

  10.        oprot.writeStructEnd();

  11.      }

这就是写field,也就是向输出流里写参数内容

  • 3.oprot_.writeMessageEnd(); 这表示消息写完成了,各个协议处理不同,比如二进制就是空实现,但如json就需要写个"}",以完成json格式

  • 4.oprot_.getTransport().flush(); 直接flush

 
   
   
 
  1. /**

  2.   * Flushes the underlying output stream if not null.

  3.   */

  4.  public void flush() throws TTransportException {

  5.    if (outputStream_ == null) {

  6.      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");

  7.    }

  8.    try {

  9.      outputStream_.flush();

  10.    } catch (IOException iox) {

  11.      throw new TTransportException(TTransportException.UNKNOWN, iox);

  12.    }

  13.  }

client总结

整个发送消息就结束了,虽然没有按套路使用动态代理,而是通过生成的stub代码,把methodName,args给封装好了

server

服务端也没有通过反射的方式

主要逻辑在生成的HelloService$Processor类中

 
   
   
 
  1. public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {

  2.    private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());

  3.    public Processor(I iface) {

  4.      super(iface, getProcessMap(new java.util.HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));

  5.    }

  6.    protected Processor(I iface, java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {

  7.      super(iface, getProcessMap(processMap));

  8.    }

  9.    private static <I extends Iface> java.util.Map<String,  org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {

  10.      processMap.put("helloString", new helloString());

  11.      return processMap;

  12.    }

  13.    public static class helloString<I extends Iface> extends org.apache.thrift.ProcessFunction<I, helloString_args> {

  14.      public helloString() {

  15.        super("helloString");

  16.      }

  17.      public helloString_args getEmptyArgsInstance() {

  18.        return new helloString_args();

  19.      }

  20.      protected boolean isOneway() {

  21.        return false;

  22.      }

  23.      @Override

  24.      protected boolean handleRuntimeExceptions() {

  25.        return false;

  26.      }

  27.      public helloString_result getResult(I iface, helloString_args args) throws org.apache.thrift.TException {

  28.        helloString_result result = new helloString_result();

  29.        result.success = iface.helloString(args.what);

  30.        return result;

  31.      }

  32.    }

  33.  }

  • 1.先看构造函数

 
   
   
 
  1. protected Processor(I iface, java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {

  2.      super(iface, getProcessMap(processMap));

  3.    }

  4.    private static <I extends Iface> java.util.Map<String,  org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<String, org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {

  5.      processMap.put("helloString", new helloString());

  6.      return processMap;

  7.    }

这段把methodName与对应的处理类映射,那后面的事就简单了,当接受到消息,取得methodName,通过map获取对就的处理类回调就可以

 
   
   
 
  1. public static class helloString<I extends Iface> extends org.apache.thrift.ProcessFunction<I, helloString_args> {

  2.      public helloString() {

  3.        super("helloString");

  4.      }

  5.      public helloString_args getEmptyArgsInstance() {

  6.        return new helloString_args();

  7.      }

  8.      protected boolean isOneway() {

  9.        return false;

  10.      }

  11.      @Override

  12.      protected boolean handleRuntimeExceptions() {

  13.        return false;

  14.      }

  15.      public helloString_result getResult(I iface, helloString_args args) throws org.apache.thrift.TException {

  16.        helloString_result result = new helloString_result();

  17.        result.success = iface.helloString(args.what);

  18.        return result;

  19.      }

  20.    }

处理类,继承ProcessFunction类,实现getResult(),这个方法就是调用了对应service.helloString()

可以再深入看一下,在socket监听消息时

 
   
   
 
  1. client = serverTransport_.accept();

  2.        if (client != null) {

  3.          processor = processorFactory_.getProcessor(client);

  4.          inputTransport = inputTransportFactory_.getTransport(client);

  5.          outputTransport = outputTransportFactory_.getTransport(client);

  6.          inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);

  7.          outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);

  8.          if (eventHandler_ != null) {

  9.            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);

  10.          }

  11.          while (true) {

  12.            if (eventHandler_ != null) {

  13.              eventHandler_.processContext(connectionContext, inputTransport, outputTransport);

  14.            }

  15.            if(!processor.process(inputProtocol, outputProtocol)) {

  16.              break;

  17.            }

  18.          }

关键行:processor.process(inputProtocol, outputProtocol)

 
   
   
 
  1. public boolean process(TProtocol in, TProtocol out) throws TException {

  2.    TMessage msg = in.readMessageBegin();

  3.    ProcessFunction fn = processMap.get(msg.name);

  4.    if (fn == null) {

  5.      TProtocolUtil.skip(in, TType.STRUCT);

  6.      in.readMessageEnd();

  7.      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");

  8.      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));

  9.      x.write(out);

  10.      out.writeMessageEnd();

  11.      out.getTransport().flush();

  12.      return true;

  13.    }

  14.    fn.process(msg.seqid, in, out, iface);

  15.    return true;

  16.  }

这就很明显了,通过methodName从map中取得ProccessFunction,再执行process方法,调用相应service的方法

总结

虽然thrift没有按以往java套路出牌,但最根本的把method发送到远程执行是一致的。可能对于多语言来讲,便于所以语言一致性,的确需要通过生成的stub代码手法来实现RPC

当然thrift并不简单,还有很多的内容需要深挖学习,但至少这个简单示例可以了解跨语言型的RPC,相关IDL,Stub的知识,有清晰认知,而不局限于概念


以上是关于再识RPC-thrift的主要内容,如果未能解决你的问题,请参考以下文章

RPC-Thrift

课程16Python再识函数

再识angular

再识spark

再识网络编程

再识iptables规则