Spark 1.6 RPC内幕解密:运行机制源码详解Netty与Akka等(DT大数据梦工厂)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 1.6 RPC内幕解密:运行机制源码详解Netty与Akka等(DT大数据梦工厂)相关的知识,希望对你有一定的参考价值。

内容:

1、Spark 1.6 RPC解析;

2、RPCEnv源码解析;

3、RPCEndpoint等源码解析;

以前和现在的RPC都是采用Akka,以前和现在的不同就在于RPCEnv,现在就是基于RPCEnv去做RPC通信的

==========Spark 1.6 RPC解析============

1、Spark 1.6推出了以RPCEnv、RPCEndpoint、RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka;

2、Akka是基于Scala的Actor的分布式消息通信系统,而在Spark 1.6中封装了Akka提供更高层的RPC实现,目的是移除对Akka的依赖,为扩展和自定义RPC打下基础;

==========RPCEnv源码解析============

1、RPCEnv是RPC的环境(相当于Akka中的ActorSsytem),所有的RPCEndpoint都需要注册到RPCEnv的实例对象中(注册的时候会指定注册的名称,这样客户端就可以通过名称查询到RPCEndpoint的RPCEndpointRef引用进而进行通信),在RPCEndpoint接收到消息后会调用receive方法进行处理,

所有的RPCEndpoint都是属于RPCEnv的;

/**
 * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
 * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
 * nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by
 * [[RpcEnv]][[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the
 * sender, or logging them if no such sender or `NotSerializableException`.
 *
 * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.
 */
private[spark] abstract class RpcEnv(conf: SparkConf) {

技术分享

2、RPCTimeoutException

下面这段,在指定时间内没结果返回,就会抛出timeout的异常

/**
 * Wait for the completed result and return it. If the result is not available within this
 * timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
 * @param  awaitable  the `Awaitableto be awaited
 * @throws RpcTimeoutException if after waiting for the specified time `awaitable`
 *         is still not ready
 */
def awaitResult[T](awaitable: Awaitable[T]): = {
  try {
    Await.result(awaitableduration)
  } catch addMessageIfTimeout
}

3、EndpointRef

/**
 * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
 * [[RpcEndpoint.self]]. Return `nullif the corresponding [[RpcEndpointRef]] does not exist.
 */
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef

4、RPCEndpoint通过receive receiveAndReply来处理消息,如果接收到需要reply的消息的话,就会交给自己的receiveAndReply(回复时候是通过RPCCallContext中的reply方法来回复发送者的),如果不需要reply的话,就交给receive方法来处理;;

/**
 * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
 * unmatched message, [[SparkException]] will be thrown and sent to `onError`.
 */
def receive: PartialFunction[Any, Unit] = {
  case _ => throw new SparkException(self + " does not implement ‘receive‘")
}

/**
 * Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message,
 * [[SparkException]] will be thrown and sent to `onError`.
 */
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  case _ => context.sendFailure(new SparkException(self + " won‘t reply anything"))
}

4、RPCEnvFactory是负责创建RPCEnv的,通过create方法创建RPCEnv实例对象,默认是Netty,

/**
 * A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
 * so that it can be created via Reflection.
 */
private[spark] object RpcEnv {

  private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
    val rpcEnvNames = Map(
      "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
      "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
    val rpcEnvName = conf.get("spark.rpc""netty")
    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCaserpcEnvName)
    Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
  }

5、RPCEndpoint的生命周期:构造(Constructor)->启动(onStart)->消息接收(receive&receiveAndReply)->停止(onStop)

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]


本文出自 “一枝花傲寒” 博客,谢绝转载!

以上是关于Spark 1.6 RPC内幕解密:运行机制源码详解Netty与Akka等(DT大数据梦工厂)的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming源码解读之数据清理内幕彻底解密

打通Spark系统运行内幕机制循环流程(DT大数据梦工厂)

Spark Executor内幕彻底解密:Executor工作原理图ExecutorBackend注册源码解密Executor实例化内幕Executor具体工作内幕

Spark Executor内幕彻底解密(DT大数据梦工厂)

Spark 定制版:016~Spark Streaming源码解读之数据清理内幕彻底解密

第16课:Spark Streaming源码解读之数据清理内幕彻底解密