Hadoop RPC 探究

Posted 小米运维

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop RPC 探究相关的知识,希望对你有一定的参考价值。

本篇文章介绍了Hadoop 中各个节点如何通过 RPC 协议来进行通信的,同时说明了思考过程,易于理解。

回顾上篇文章:


前言


今天通过一个小例子来说明一下在 Hadoop 中各个节点如何通过 RPC 协议来进行通信的。使用 Hadoop 中节点间的通信方式来写一个 Server 和 Client。Client 端提供两个数字,传输给 server 端,由 Server 端计算两个数字的和返回结果在 Client 端显示。

首先要说明一下,Hadoop 是 java 写的,其中各个服务间通信的消息需要序列化后传输,使用的序列化工具是 protocol buffer(不过多介绍,下面有使用)。

定义协议


最开始我们需要定义一个协议 (protocol)。什么是协议?在 java 中就可以理解为接口,定义方法用的。换句话说就是先要定义一个接口,这个接口告诉了客户端和服务端需要做的事情,今天我们要做的是两个数字的加法。那么我们定义一个接口,非常简单 (要懂一点点 java 才行)。

接口中就一个方法 add

public interface CalculateProtocol {    
   public int add(int num1,int num2); }



接口定义完了,当然需要实现类,不然接口存在是没有意义的。定义一个实现,也就是具体的做法 (两个数据相加):

public class CalculateProtocolImpl implements CalculateProtocol{    
   @Override    public int add(int num1, int num2) {        
       return num1+num2;    } }



序列化


在不涉及网络通信的情况下,已经可以写一个 main 函数来两个数相加了。但是涉及到网络传输,就需要刚刚提到的序列化。上述的 add 方法的参数是非序列化的,是不可以在网络中传输的。那么 Hadoop 是怎么操作的呢?把我们刚刚定义的协议 (接口) 中的方法以及方法参数全部序列化。这里就用到了 protocol buffer 了(需要编译安装才能使用)。


定义 2 个 protocol buffer 文件,分别定义一下序列化的参数和序列化的方法,具体内容如下。


CalculateMessage.proto:

option java_outer_classname="CalculateMessage";
option java_generic_services=true;
option java_generate_equals_and_hash=true;

message RequestProto {
   required int32 num1=2;
   required int32 num2=3;
}

message ResopnseProto {
   required int32 result=1;
}


这里可以看到,我们把 num1 和 num2 变成了 RequestProto,把计算结果变成了 ResponseProto。


另外一个 CalculateServer.proto:

option java_outer_classname="CalculateServer";
option java_generic_services=true;
option java_generate_equals_and_hash=true;

import "CalculateMessage.proto"; service CalculateService {  
   rpc add(RequestProto) returns (ResopnseProto); }


这里,我们把方法 add 也做了序列化,参数和返回结果分别是 CalculateMessage.proto 文件中的 RequestProto 和 ResopnseProto。

完成之后就可以使用 proto 命令编译这两个文件:

protoc --proto_path=./ --java_out ./ ./CalculateMessage.proto
protoc --proto_path=./ --java_out ./ ./CalculateServer.proto


编译完之后会生成两个 java 文件,这两个 java 文件的名字在 proto 文件中已经有定义 (开头的 java_outer_classname)。这两个类代码都比较多,咱们不需要关心,后面用一下就行了。

接下来该干什么呢?

我们需要定义一个序列化的接口

CalculateProtocolPB:

@ProtocolInfo(protocolName ="com.xiaomi.chy.RPC.CalculateProtocolPB", protocolVersion = 1)  
public interface CalculateProtocolPB extends BlockingInterface{ }


什么都不用做,就继承 BlockingInterface 这个接口就行 (刚刚 proto 文件编译生成的)

这样我们就已经定义好了需要做的事情 (两个数相加,也就是非序列化的接口)、序列化参数以及定义序列化的接口。接下来要做的就是如何使用这些定义好的接口了。


适配器模式


客户端需要做什么操作呢?当然是调用普通的方法,也就是非序列化的方法,还需要一个适配的类把这个调用转化成为对序列化方法的调用。

服务端呢?服务端接收到了序列化的信息,当然调用了序列化的方法,同样也需要一个适配的类,把对序列化的方法的调用转化成为对非序列化方法的调用,才能得到我们想要的执行结果。这里就用到了设计模式的适配器模式。

先定义客户端的适配类,

CalculateProtolTranslatorPB:

public class CalculateProtocolTranslatorPB implements CalculateProtocol{    
   final private CalculateProtocolPB rpcProxy;    
   public CalculateProtocolTranslatorPB(CalculateProtocolPB proxy){              
       this.rpcProxy = proxy;    }    

   @Override    public int add(int num1, int num2) {             RequestProto req = RequestProto.newBuilder()                            .setNum1(num1).setNum2(num2)                            .build();               ResopnseProto resp;              
               try {                    resp = rpcProxy.add(null, req);                  
                   return resp.getResult();               } catch (ServiceException e) {                   e.printStackTrace();               }            
              return 0;     }   }


这里的适配类实现了 CalculateProtocol 这个非序列化的接口,当然就实现了其中的非序列化方法,也就是 add 方法。最终客户端调用的也是这个类的 add 方法。这里的 add 方法,用了 int 类型的参数,方法中将 int 类型的参数转化成了序列化的参数,然后调用了序列化的 add 方法,返回了结果 (resp.getResult()),这个返回结果一个数字,是非序列化的,也就是最终结果。


然后定义服务端的适配类,

CalculateProtocolServerSideTranslatorPB:

public class CalculateProtocolServerSideTranslatorPB implements CalculateProtocalPB{
private final CalculateProtocol server;    

   public CalculateProtocolServerSideTranslatorPB(CalculateProtocol server){            
       this.server = server;     }  
       
    @Override     public ResopnseProto add(RpcController controller, RequestProto request) throws ServiceException {         ResopnseProto.Builder builder = CalculateMessage.ResopnseProto.newBuilder();    
        int result = server.add(request.getNum1(), request.getNum2());         builder.setResult(result);    
        return builder.build();   } }


这里的适配类实现了 CalculateProtocolPB 这个序列化的接口,当然也就实现了其中的序列化方法。最终服务端调用的也是这个类来得到最终的结果。这里的 add 方法,用了序列化的参数,方法中将序列化的参数转化成了非序列化参数 (request.getNum1() 方法得到),然后调用了非序列化的 add 方法得到最终的结果,并将其反序列化后返回给客户端。

到此为止处理逻辑的代码全部都搞定了,接下来只剩客户端代码和服务端代码了。


服务端和客户端


先来服务端:Server 类

public class Server{
 public static void main(String [] args) throws HadoopIllegalArgumentException, IOException {    Configuration conf = new HdfsConfiguration();    RPC.setProtocolEngine(conf, CalculateProtocolPB.class, ProtobufRpcEngine.class);    RPC.Server server = new RPC.Builder(conf)                             .setProtocol(CalculateProtocolPB.class)                             .setInstance(CalculateService.newReflectiveBlockingService(new CalculateProtocolServerSideTranslatorPB(new CalculateProtocolImpl())))                            .setBindAddress("127.0.0.1")                            .setPort(9999).setNumHandlers(1).setVerbose(true)                            .build();        server.start(); } }


处理很简单,就是指定下序列化的类 (setProtocol) 、实例 (setInstance、以及 ip 端口,构造出 RPC.Server,然后启动。


接下来是客户端:Client 类

public class Client {
   final static long VERSION = 1;    
   public static void main(String [] args) throws IOException{        Configuration conf = new HdfsConfiguration();        RPC.setProtocolEngine(conf, CalculateProtocolPB.class, ProtobufRpcEngine.class);        CalculateProtocolPB proxy = RPC.getProxy(CalculateProtocolPB.class,VERSION,new InetSocketAddress("127.0.0.1",9999), conf);        CalculateProtocolTranslatorPB cal = new CalculateProtocolTranslatorPB(proxy);
       int addresult = cal.add(1, 2);        
       System.out.println(addresult); } }


核心就是得到序列化接口 CalculateProtocalPB 的实例 (这里是 proxy),然后初始化出客户端的适配类 CalculateProtocolTranslatorPB 的实例,最后调用 add 方法即可。

运行 Server 类,再运行 Client 类就可以看到结果了。(1+2=3)


通信过程

       

到这里,客户端究竟是什么时候把数据发出去的呢?好像没看到啊?答案就在 Client 类中执行 add 方法的时候 (cal.add(1,2))。这个 add 方法,传入的是数字,是非序列化的。之前提到过适配器,这里就使用了这个适配器模式,将调用的非序列化方法适配成了序列化方法。可以看到,客户端实际调用的 add 方法(cal.add(1,2)) 被适配成了 proxy 实例的 add 方法(参看一下 CalculateProtocolTranslatorPB 这个类)。


而为了得到这个实例,调用了 RPC.getProxy()这个方法。这个方法得到的 CalculateProtocolPB 的实例其实是一个代理实例 (使用了动态代理,Hadoop 里大量使用了动态代理)。动态代理的好处就是在调用 proxy 实例的 add 方法之前可以加一点别的操作,即在客户端执行 cal.add(1,2) 的时候,看起来是直接执行了 1+2 操作,其实并没有真正在这里计算 1+2,而是在这之前将 1 和 2 这两个参数传给了服务端,服务端接收到参数后反序列化进行计算(参看一下 CalculateProtocolServerSideTranslatorPB 这个类),然后返回给客户端结果。最终现象感觉就是像直接在客户端计算了结果,并没有服务端的存在。这也是 RPC 通信的特点,让客户端无感知,调用方法就好像调用本地方法一样。

Hadoop 中,存在各类服务的通信。Namenode 和客户端、Namenode 和 Datanode、Datanode 和 Datanode。这些服务间的通信原理与今天举的例子一模一样。可以看到,所有的代码均没有涉及到具体的网络通信操作,因为 Hadoop 已经把这些底层的操作全都封装好了,我们今天所做的就是编写 proto 文件并编译生成 java 文件、定义接口 (序列化和非序列化)、定义适配类,然后仿照源码完成方法的编写,即可完成 Client 到 Server 的通信。

以 Namenode 和客户端交互来说,列一下源码中的类和上述例子的对比。从命名方式到具体代码的编写,都是按照 Hadoop 的套路来的。


hadoop 源码类

例子中的类

ClientProtocol(非序列化接口,定义了客户端和 namenode 之间交互的所有方法)

CalculateProtocol

NameNodeRpcServer(namenode 真正提供服务的类,这个类中的方法会具体的和文件系统打交道)

CalculateProtocolImpl

ClientProtocolPB(序列化接口)

CalculateProtocolPB

ClientProtocolTranslatorPB(客户端适配类)

CalculateProtocolTranslatorPB

ClientProtocolServerSideTranslatorPB(服务端适配类)

CalculateProtocolServerSideTranslatorPB


小结


当然,虽然原理一致,今天的例子还是很简单的,真正的 Hadoop 的代码还是非常复杂的,其实各种动态代理的使用,非常的绕,有兴趣的同学可以去研究研究。








一键关注,小米运维(。•ˇ‸ˇ•。)


以上是关于Hadoop RPC 探究的主要内容,如果未能解决你的问题,请参考以下文章

RPC框架研究Hadoop源代码-1

Hadoop RPC通信机制

hadoop[6]-rpc

Hadoop HDFS编程 API入门系列之RPC版本2

Hadoop HDFS编程 API入门系列之RPC版本1

2018-07-23期 Hadoop RPC模拟NameNode