HadoopRPC在client端的源码解析

Posted Gendan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HadoopRPC在client端的源码解析相关的知识,希望对你有一定的参考价值。

@Override

public Object invoke(Object proxy, Method method, Object[] args)
        throws ServiceException {
    long startTime = 0;
    if (LOG.isDebugEnabled()) {
        startTime = Time.now();
    }
    //判断传入的两个参数,实际上调用的rename是ClientNamenodeProtocolTranslatorPB的rename
    //传入两个参数如下一个是null
    // 另一个是将请求的参数从字符串转换为Protocolbuf的请求
    // RenameRequestProto req = RenameRequestProto.newBuilder().setSrc(src).setDst(dst).build();
    // rpcProxy.rename(null, req).getResult();
    if (args.length != 2) { // RpcController + Message
        throw new ServiceException("Too many parameters for request. Method: ["
                + method.getName() + "]" + ", Expected: 2, Actual: "
                + args.length);
    }
    if (args[1] == null) {
        throw new ServiceException("null param while calling Method: ["
                + method.getName() + "]");
    }
    TraceScope traceScope = null;
    // if Tracing is on then start a new span for this rpc.
    // guard it in the if statement to make sure there isn\'t
    // any extra string manipulation.
    if (Trace.isTracing()) {
        traceScope = Trace.startSpan(
                method.getDeclaringClass().getCanonicalName() +
                        "." + method.getName());
    }
    //构造请求头域,标明在什么借口上调用什么方法
    RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
    
    if (LOG.isTraceEnabled()) {
        LOG.trace(Thread.currentThread().getId() + ": Call -> " +
                remoteId + ": " + method.getName() +
                " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
    }
    //获取实际的请求参数,
    Message theRequest = (Message) args[1];
    final RpcResponseWrapper val;
    try {
        //真正将数据发送给远端服务!!!
        val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
                new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
                fallbackToSimpleAuth);
    } catch (Throwable e) {
        if (LOG.isTraceEnabled()) {
            LOG.trace(Thread.currentThread().[PerfectMoney下载](https://www.gendan5.com/wallet/PerfectMoney.html)getId() + ": Exception <- " +
                    remoteId + ": " + method.getName() +
                    " {" + e + "}");
        }
        if (Trace.isTracing()) {
            traceScope.getSpan().addTimelineAnnotation(
                    "Call got exception: " + e.getMessage());
        }
        throw new ServiceException(e);
    } finally {
        if (traceScope != null) traceScope.close();
    }
    if (LOG.isDebugEnabled()) {
        long callTime = Time.now() - startTime;
        LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
    }
    Message prototype = null;
    try {
        //获得返回参数
        prototype = getReturnProtoType(method);
    } catch (Exception e) {
        throw new ServiceException(e);
    }
    Message returnMessage;
    try {
        //序列化相应信息并返回
        returnMessage = prototype.newBuilderForType()
                .mergeFrom(val.theResponseRead).build(); 
        if (LOG.isTraceEnabled()) {
            LOG.trace(Thread.currentThread().getId() + ": Response <- " +
                    remoteId + ": " + method.getName() +
                    " {" + TextFormat.shortDebugString(returnMessage) + "}");
        } 
    } catch (Throwable e) {
        throw new ServiceException(e);
    }
    //返回结果
    return returnMessage;
}

以上是关于HadoopRPC在client端的源码解析的主要内容,如果未能解决你的问题,请参考以下文章

自编写RPC通信实例解析HadoopRPC通信原理

Hadoop源码解析之 rpc通信 client到server通信

ribbon源码解析

CAS集成源码解析

canal 源码解析系列-canal的HA机制解析

Eureka源码解析:eureka-client启动原理分析