Spark Executor内幕彻底解密(DT大数据梦工厂)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Executor内幕彻底解密(DT大数据梦工厂)相关的知识,希望对你有一定的参考价值。
内容:
1、Spark Executor工作原理图;
2、ExecutorBackend注册源码解密;
3、Executor实例化内幕;
4、Executor具体是如何工作的?
1、Master发指令给Worker启动Executor;
2、Worker接受到Master发送来的指令,通过ExecutorRunner启动另外一个进程来运行Executor;
3、此时会启动粗粒度的ExecutorBackend(CoarseGrainedExecutorBackend);
4、CoarseGrainedExecutorBackend通过发送RegisterExecutor向Driver注册;
5、Driver在Executor注册成功之后会返回RegisterExecutor信息给CoarseGrainedExecutorBackend;
==========Spark Executor工作原理图 ============
1、需要特别注意的是在CoarseGrainedExecutorBackend启动时,向Driver注册Executor其实质是注册ExecutorBackend实例,和Executor实例之间没有直接的关系;
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[RegisterExecutorResponse](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
}
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
}
}(ThreadUtils.sameThread)
}
2、CoarseGrainedExecutorBackend是Executor运行所在的进程名称,Executor才是真正处理Task对象所在,Executor内部是通过线程池的方式来完成Task的计算的;
3、CoarseGrainedExecutorBackend和Executor是一一对应的;
4、CoarseGrainedExecutorBackend是一个消息通信体(其实现了ThreadSafeRPCEndpoint),可以发消息给Driver并可以接受Driver中发过来的指令,例如启动Task等;
5、在Driver进程中,有两个至关重要的EndPoint:
1)ClientEndpoint:主要负责向Master注册当前的程序,是AppClient的内部成员;
2)DriverEndpoint:这是整个程序运行时候的驱动器,是CoarseGrainedExecutorBackend的内部成员,这里会接收到RegisterExecutor 信息并完成在Driver中的注册;
6、在Driver中通过ExecutorData封装并注册ExecutorBackend信息到Driver内存数据结构executorMapData(CoarseGrainedSchedulerBackend的成员)
7、实际在执行的时候DriverEndpoint会把信息吸入CoarseGrainedSchedulerBackend的内存数据结构executorMapData,所以说最终是确定注册给了CoarseGrainedSchedulerBackend,也就是说CoarseGrainedSchedulerBackend掌握了为当前程序分配的所有的ExecutorBackend进程,而在每一个ExecutorBackend进行实例中会通过Executor对象来负责具体Task的运行。在运行的时候使用synchronized关键字来保障executorMapData安全的并发写操作;
8、CoarseGrainedExecutorBackend收到DriverEndpoint发送过来的RegisteredExecutor消息后会启动executor实例对象,而executor实例对象对象事实上负责真正的Task计算的;
9、创建的ThreadPool中以多线程并发执行和线程复用的方式来高效的执行Spark发过来的Task,接收到Task执行的命令后,会首先把Task封装在TaskRunner里面
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
if (executorDataMap.contains(executorId)) {
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
// If the executor‘s rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(RegisteredExecutor(executorAddress.host))
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
private[cluster] class ExecutorData(
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
// Start worker thread pool
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val executorSource = new ExecutorSource(threadPool, executorId)
==========Executor是如何工作的============
1、当Driver发送过来Task的时候,其实是发送给了CoaresGrainedExecutorBackend这个RPCEndpoint,而不是直接发送给了Executor(Executor由于不是消息循环体,所以永远无法接收远程发送过来的信息)
2、ExecutorBackend在收到Driver中发送过来的消息后会提供调用给launchTask来交给Executor去执行,然后交给线程池中的线程处理;
3、TaskRunner其实JAVA中的Runnable接口的具体实现,在真正工作的时候会交给线程池,线程池中的线程去运行,此时会调用run方法来执行Task;
4、TaskRunner在调用run方法的时候会调用Task的run方法,而Task的run方法会调用runTask,而实际Task有ShuffleMapTask和ResultTask;
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
王家林老师名片:
中国Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公众号:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
手机:18610086859
QQ:1740415547
本文出自 “一枝花傲寒” 博客,谢绝转载!
以上是关于Spark Executor内幕彻底解密(DT大数据梦工厂)的主要内容,如果未能解决你的问题,请参考以下文章
Spark Runtime(DriverMassterWorkerExecutor)内幕解密(DT大数据梦工厂)
Spark Sort-Based Shuffle内幕彻底解密(DT大数据梦工厂)
[Spark內核] 第42课:Spark Broadcast内幕解密:Broadcast运行机制彻底解密Broadcast源码解析Broadcast最佳实践
第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动两种不同的资源调度方式源码彻底解析资源调度内幕总结