Spark 1.6 RPC内幕解密:运行机制源码详解Netty与Akka等(DT大数据梦工厂)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 1.6 RPC内幕解密:运行机制源码详解Netty与Akka等(DT大数据梦工厂)相关的知识,希望对你有一定的参考价值。
内容:
1、Spark 1.6 RPC解析;
2、RPCEnv源码解析;
3、RPCEndpoint等源码解析;
以前和现在的RPC都是采用Akka,以前和现在的不同就在于RPCEnv,现在就是基于RPCEnv去做RPC通信的
==========Spark 1.6 RPC解析============
1、Spark 1.6推出了以RPCEnv、RPCEndpoint、RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka;
2、Akka是基于Scala的Actor的分布式消息通信系统,而在Spark 1.6中封装了Akka提供更高层的RPC实现,目的是移除对Akka的依赖,为扩展和自定义RPC打下基础;
==========RPCEnv源码解析============
1、RPCEnv是RPC的环境(相当于Akka中的ActorSsytem),所有的RPCEndpoint都需要注册到RPCEnv的实例对象中(注册的时候会指定注册的名称,这样客户端就可以通过名称查询到RPCEndpoint的RPCEndpointRef引用进而进行通信),在RPCEndpoint接收到消息后会调用receive方法进行处理,
所有的RPCEndpoint都是属于RPCEnv的;
/**
* An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
* receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
* nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by
* [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the
* sender, or logging them if no such sender or `NotSerializableException`.
*
* [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {
2、RPCTimeoutException
下面这段,在指定时间内没结果返回,就会抛出timeout的异常
/**
* Wait for the completed result and return it. If the result is not available within this
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
* @param awaitable the `Awaitable` to be awaited
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
* is still not ready
*/
def awaitResult[T](awaitable: Awaitable[T]): T = {
try {
Await.result(awaitable, duration)
} catch addMessageIfTimeout
}
3、EndpointRef
/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
* [[RpcEndpoint.self]]. Return `null` if the corresponding [[RpcEndpointRef]] does not exist.
*/
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
4、RPCEndpoint通过receive receiveAndReply来处理消息,如果接收到需要reply的消息的话,就会交给自己的receiveAndReply(回复时候是通过RPCCallContext中的reply方法来回复发送者的),如果不需要reply的话,就交给receive方法来处理;;
/**
* Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
* unmatched message, [[SparkException]] will be thrown and sent to `onError`.
*/
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement ‘receive‘")
}
/**
* Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message,
* [[SparkException]] will be thrown and sent to `onError`.
*/
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won‘t reply anything"))
}
4、RPCEnvFactory是负责创建RPCEnv的,通过create方法创建RPCEnv实例对象,默认是Netty,
/**
* A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
* so that it can be created via Reflection.
*/
private[spark] object RpcEnv {
private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
val rpcEnvNames = Map(
"akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
"netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
val rpcEnvName = conf.get("spark.rpc", "netty")
val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
}
5、RPCEndpoint的生命周期:构造(Constructor)->启动(onStart)->消息接收(receive&receiveAndReply)->停止(onStop)
王家林老师名片:
中国Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公众号:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
手机:18610086859
QQ:1740415547
本文出自 “一枝花傲寒” 博客,谢绝转载!
以上是关于Spark 1.6 RPC内幕解密:运行机制源码详解Netty与Akka等(DT大数据梦工厂)的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming源码解读之数据清理内幕彻底解密
Spark Executor内幕彻底解密:Executor工作原理图ExecutorBackend注册源码解密Executor实例化内幕Executor具体工作内幕
Spark Executor内幕彻底解密(DT大数据梦工厂)