Hadoop RPC 探究
Posted 小米运维
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop RPC 探究相关的知识,希望对你有一定的参考价值。
本篇文章介绍了Hadoop 中各个节点如何通过 RPC 协议来进行通信的,同时说明了思考过程,易于理解。
回顾上篇文章:
前言
今天通过一个小例子来说明一下在 Hadoop 中各个节点如何通过 RPC 协议来进行通信的。使用 Hadoop 中节点间的通信方式来写一个 Server 和 Client。Client 端提供两个数字,传输给 server 端,由 Server 端计算两个数字的和返回结果在 Client 端显示。
首先要说明一下,Hadoop 是 java 写的,其中各个服务间通信的消息需要序列化后传输,使用的序列化工具是 protocol buffer(不过多介绍,下面有使用)。
定义协议
最开始我们需要定义一个协议 (protocol)。什么是协议?在 java 中就可以理解为接口,定义方法用的。换句话说就是先要定义一个接口,这个接口告诉了客户端和服务端需要做的事情,今天我们要做的是两个数字的加法。那么我们定义一个接口,非常简单 (要懂一点点 java 才行)。
接口中就一个方法 add
public interface CalculateProtocol {
public int add(int num1,int num2);
}
接口定义完了,当然需要实现类,不然接口存在是没有意义的。定义一个实现,也就是具体的做法 (两个数据相加):
public class CalculateProtocolImpl implements CalculateProtocol{
@Override
public int add(int num1, int num2) {
return num1+num2;
}
}
序列化
在不涉及网络通信的情况下,已经可以写一个 main 函数来两个数相加了。但是涉及到网络传输,就需要刚刚提到的序列化。上述的 add 方法的参数是非序列化的,是不可以在网络中传输的。那么 Hadoop 是怎么操作的呢?把我们刚刚定义的协议 (接口) 中的方法以及方法参数全部序列化。这里就用到了 protocol buffer 了(需要编译安装才能使用)。
定义 2 个 protocol buffer 文件,分别定义一下序列化的参数和序列化的方法,具体内容如下。
CalculateMessage.proto:
option java_outer_classname="CalculateMessage";
option java_generic_services=true;
option java_generate_equals_and_hash=true;
message RequestProto {
required int32 num1=2;
required int32 num2=3;
}
message ResopnseProto {
required int32 result=1;
}
这里可以看到,我们把 num1 和 num2 变成了 RequestProto,把计算结果变成了 ResponseProto。
另外一个 CalculateServer.proto:
option java_outer_classname="CalculateServer";
option java_generic_services=true;
option java_generate_equals_and_hash=true;
import "CalculateMessage.proto";
service CalculateService {
rpc add(RequestProto) returns (ResopnseProto);
}
这里,我们把方法 add 也做了序列化,参数和返回结果分别是 CalculateMessage.proto 文件中的 RequestProto 和 ResopnseProto。
完成之后就可以使用 proto 命令编译这两个文件:
protoc --proto_path=./ --java_out ./ ./CalculateMessage.proto
protoc --proto_path=./ --java_out ./ ./CalculateServer.proto
编译完之后会生成两个 java 文件,这两个 java 文件的名字在 proto 文件中已经有定义 (开头的 java_outer_classname)。这两个类代码都比较多,咱们不需要关心,后面用一下就行了。
接下来该干什么呢?
我们需要定义一个序列化的接口
CalculateProtocolPB:
@ProtocolInfo(protocolName ="com.xiaomi.chy.RPC.CalculateProtocolPB", protocolVersion = 1)
public interface CalculateProtocolPB extends BlockingInterface{
}
什么都不用做,就继承 BlockingInterface 这个接口就行 (刚刚 proto 文件编译生成的)
这样我们就已经定义好了需要做的事情 (两个数相加,也就是非序列化的接口)、序列化参数以及定义序列化的接口。接下来要做的就是如何使用这些定义好的接口了。
适配器模式
客户端需要做什么操作呢?当然是调用普通的方法,也就是非序列化的方法,还需要一个适配的类把这个调用转化成为对序列化方法的调用。
服务端呢?服务端接收到了序列化的信息,当然调用了序列化的方法,同样也需要一个适配的类,把对序列化的方法的调用转化成为对非序列化方法的调用,才能得到我们想要的执行结果。这里就用到了设计模式的适配器模式。
先定义客户端的适配类,
CalculateProtolTranslatorPB:
public class CalculateProtocolTranslatorPB implements CalculateProtocol{
final private CalculateProtocolPB rpcProxy;
public CalculateProtocolTranslatorPB(CalculateProtocolPB proxy){
this.rpcProxy = proxy;
}
@Override
public int add(int num1, int num2) {
RequestProto req = RequestProto.newBuilder()
.setNum1(num1).setNum2(num2)
.build();
ResopnseProto resp;
try {
resp = rpcProxy.add(null, req);
return resp.getResult();
} catch (ServiceException e) {
e.printStackTrace();
}
return 0;
}
}
这里的适配类实现了 CalculateProtocol 这个非序列化的接口,当然就实现了其中的非序列化方法,也就是 add 方法。最终客户端调用的也是这个类的 add 方法。这里的 add 方法,用了 int 类型的参数,方法中将 int 类型的参数转化成了序列化的参数,然后调用了序列化的 add 方法,返回了结果 (resp.getResult()),这个返回结果一个数字,是非序列化的,也就是最终结果。
然后定义服务端的适配类,
CalculateProtocolServerSideTranslatorPB:
public class CalculateProtocolServerSideTranslatorPB implements CalculateProtocalPB{
private final CalculateProtocol server;
public CalculateProtocolServerSideTranslatorPB(CalculateProtocol server){
this.server = server;
}
@Override
public ResopnseProto add(RpcController controller, RequestProto request) throws ServiceException {
ResopnseProto.Builder builder = CalculateMessage.ResopnseProto.newBuilder();
int result = server.add(request.getNum1(), request.getNum2());
builder.setResult(result);
return builder.build();
}
}
这里的适配类实现了 CalculateProtocolPB 这个序列化的接口,当然也就实现了其中的序列化方法。最终服务端调用的也是这个类来得到最终的结果。这里的 add 方法,用了序列化的参数,方法中将序列化的参数转化成了非序列化参数 (request.getNum1() 方法得到),然后调用了非序列化的 add 方法得到最终的结果,并将其反序列化后返回给客户端。
到此为止处理逻辑的代码全部都搞定了,接下来只剩客户端代码和服务端代码了。
服务端和客户端
先来服务端:Server 类
public class Server{
public static void main(String [] args) throws HadoopIllegalArgumentException, IOException {
Configuration conf = new HdfsConfiguration();
RPC.setProtocolEngine(conf, CalculateProtocolPB.class, ProtobufRpcEngine.class);
RPC.Server server = new RPC.Builder(conf)
.setProtocol(CalculateProtocolPB.class)
.setInstance(CalculateService.newReflectiveBlockingService(new CalculateProtocolServerSideTranslatorPB(new CalculateProtocolImpl())))
.setBindAddress("127.0.0.1")
.setPort(9999).setNumHandlers(1).setVerbose(true)
.build();
server.start();
}
}
处理很简单,就是指定下序列化的类 (setProtocol) 、实例 (setInstance、以及 ip 端口,构造出 RPC.Server,然后启动。
接下来是客户端:Client 类
public class Client {
final static long VERSION = 1;
public static void main(String [] args) throws IOException{
Configuration conf = new HdfsConfiguration();
RPC.setProtocolEngine(conf, CalculateProtocolPB.class, ProtobufRpcEngine.class);
CalculateProtocolPB proxy = RPC.getProxy(CalculateProtocolPB.class,VERSION,new InetSocketAddress("127.0.0.1",9999), conf);
CalculateProtocolTranslatorPB cal = new CalculateProtocolTranslatorPB(proxy);
int addresult = cal.add(1, 2);
System.out.println(addresult);
}
}
核心就是得到序列化接口 CalculateProtocalPB 的实例 (这里是 proxy),然后初始化出客户端的适配类 CalculateProtocolTranslatorPB 的实例,最后调用 add 方法即可。
运行 Server 类,再运行 Client 类就可以看到结果了。(1+2=3)
通信过程
到这里,客户端究竟是什么时候把数据发出去的呢?好像没看到啊?答案就在 Client 类中执行 add 方法的时候 (cal.add(1,2))。这个 add 方法,传入的是数字,是非序列化的。之前提到过适配器,这里就使用了这个适配器模式,将调用的非序列化方法适配成了序列化方法。可以看到,客户端实际调用的 add 方法(cal.add(1,2)) 被适配成了 proxy 实例的 add 方法(参看一下 CalculateProtocolTranslatorPB 这个类)。
而为了得到这个实例,调用了 RPC.getProxy()这个方法。这个方法得到的 CalculateProtocolPB 的实例其实是一个代理实例 (使用了动态代理,Hadoop 里大量使用了动态代理)。动态代理的好处就是在调用 proxy 实例的 add 方法之前可以加一点别的操作,即在客户端执行 cal.add(1,2) 的时候,看起来是直接执行了 1+2 操作,其实并没有真正在这里计算 1+2,而是在这之前将 1 和 2 这两个参数传给了服务端,服务端接收到参数后反序列化进行计算(参看一下 CalculateProtocolServerSideTranslatorPB 这个类),然后返回给客户端结果。最终现象感觉就是像直接在客户端计算了结果,并没有服务端的存在。这也是 RPC 通信的特点,让客户端无感知,调用方法就好像调用本地方法一样。
Hadoop 中,存在各类服务的通信。Namenode 和客户端、Namenode 和 Datanode、Datanode 和 Datanode。这些服务间的通信原理与今天举的例子一模一样。可以看到,所有的代码均没有涉及到具体的网络通信操作,因为 Hadoop 已经把这些底层的操作全都封装好了,我们今天所做的就是编写 proto 文件并编译生成 java 文件、定义接口 (序列化和非序列化)、定义适配类,然后仿照源码完成方法的编写,即可完成 Client 到 Server 的通信。
以 Namenode 和客户端交互来说,列一下源码中的类和上述例子的对比。从命名方式到具体代码的编写,都是按照 Hadoop 的套路来的。
hadoop 源码类 |
例子中的类 |
ClientProtocol(非序列化接口,定义了客户端和 namenode 之间交互的所有方法) |
CalculateProtocol |
NameNodeRpcServer(namenode 真正提供服务的类,这个类中的方法会具体的和文件系统打交道) |
CalculateProtocolImpl |
ClientProtocolPB(序列化接口) |
CalculateProtocolPB |
ClientProtocolTranslatorPB(客户端适配类) |
CalculateProtocolTranslatorPB |
ClientProtocolServerSideTranslatorPB(服务端适配类) |
CalculateProtocolServerSideTranslatorPB |
小结
当然,虽然原理一致,今天的例子还是很简单的,真正的 Hadoop 的代码还是非常复杂的,其实各种动态代理的使用,非常的绕,有兴趣的同学可以去研究研究。
一键关注,小米运维(。•ˇ‸ˇ•。)
以上是关于Hadoop RPC 探究的主要内容,如果未能解决你的问题,请参考以下文章