Spark-Luanch Driver
Posted eggplantpro
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-Luanch Driver相关的知识,希望对你有一定的参考价值。
1.SparkSubmit.scala
主要调用M-prepareSubmitEnvironment,该方法更根据用户定义的参数,匹配不同client,去调用不同clientApp。(ps:本次讲ClientApp 也就是standalone)
在M-runMain通过 调用M-Utils.classForName 反射的方式调用 ClientApp 的 M-main (ps:如果是localhost 或者是client 直接反射用户的定义的main)
几种提交方式
// Following constants are visible for testing. private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication" private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName() private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.k8s.submit.KubernetesClientApplication" private[deploy] def prepareSubmitEnvironment( args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String)
2.ClientApp.scala
最后driver粗粒度就是DriverWrapper
通过Rpc 发送给driver
override def onStart(): Unit = { driverArgs.cmd match { case "launch" => val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" asyncSendToMasterAndForwardReply[SubmitDriverResponse](RequestSubmitDriver(driverDescription))
3.Master.scala
master 接受之后,放入map缓存中,调用M-schedule,根据资源选择一个work,向该work发送启动LaunchDriver的消息
case RequestSubmitDriver(description) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only accept driver submissions in ALIVE state." context.reply(SubmitDriverResponse(self, false, None, msg)) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() // TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it‘s simply "fire and forget". context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")) } } private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } // Drivers take strict precedence over executors val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() } private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) driver.state = DriverState.RUNNING }
4.Work.scala
work接受消息之后,new DriverRunner() 调用该对象的M-start
case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner( conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) drivers(driverId) = driver driver.start()
5.DriverRunner.scala
该对象中,M-start 中new 了一个线程,调用prepareAndRunDriver 最后通过 ProcessBuilder调用 DriverWrapper 的main(step2中的)
private[worker] def start() = { new Thread("DriverRunner for " + driverId) { override def run() { var shutdownHook: AnyRef = null try { shutdownHook = ShutdownHookManager.addShutdownHook { () => logInfo(s"Worker shutting down, killing driver $driverId") kill() } // prepare driver jars and run driver val exitCode = prepareAndRunDriver() // set final state depending on if forcibly killed and process exit code finalState = if (exitCode == 0) { Some(DriverState.FINISHED) } else if (killed) { Some(DriverState.KILLED) } else { Some(DriverState.FAILED) } } catch { case e: Exception => kill() finalState = Some(DriverState.ERROR) finalException = Some(e) } finally { if (shutdownHook != null) { ShutdownHookManager.removeShutdownHook(shutdownHook) } } // notify worker of final driver state, possible exception worker.send(DriverStateChanged(driverId, finalState.get, finalException)) } }.start() } private[worker] def prepareAndRunDriver(): Int = { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename case other => other } // TODO: If we add ability to submit multiple jars they should also be added here val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) runDriver(builder, driverDir, driverDesc.supervise) } private[worker] def prepareAndRunDriver(): Int = { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename case other => other } // TODO: If we add ability to submit multiple jars they should also be added here val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) runDriver(builder, driverDir, driverDesc.supervise) } 6.DriverWrapper.scala (粗粒度Driver client) 开始调用用户指定 jar 和main 真正开始执行我们所写的代码 def main(args: Array[String]) { args.toList match { /* * IMPORTANT: Spark 1.3 provides a stable application submission gateway that is both * backward and forward compatible across future Spark versions. Because this gateway * uses this class to launch the driver, the ordering and semantics of the arguments * here must also remain consistent across versions. */ case workerUrl :: userJar :: mainClass :: extraArgs => val conf = new SparkConf() val host: String = Utils.localHostName() val port: Int = sys.props.getOrElse("spark.driver.port", "0").toInt val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf)) logInfo(s"Driver address: ${rpcEnv.address}") rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl)) val currentLoader = Thread.currentThread.getContextClassLoader val userJarUrl = new File(userJar).toURI().toURL() val loader = if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) { new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader) } else { new MutableURLClassLoader(Array(userJarUrl), currentLoader) } Thread.currentThread.setContextClassLoader(loader) setupDependencies(loader, userJar) // Delegate to supplied main class val clazz = Utils.classForName(mainClass) val mainMethod = clazz.getMethod("main", classOf[Array[String]]) mainMethod.invoke(null, extraArgs.toArray[String]) rpcEnv.shutdown() case _ => // scalastyle:off println System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]") // scalastyle:on println System.exit(-1) } }
以上是关于Spark-Luanch Driver的主要内容,如果未能解决你的问题,请参考以下文章