spark 01
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 01相关的知识,希望对你有一定的参考价值。
- debug environment:scala, abt, git bash, eclipse scala ide plugin
- spark-shell:
1spark-shell →spark-submit→(SparkSubmit)spark-class
2open jvm→thread dump→main:SparkSubmit.main→repl.Main→SparkILoop.process:(initializeSpark→createSparkContext)
3SparkContext:(1Utils.getCallSite, 2markPartiallyConstructed(this,config.getBoolean("spark.driver.allowMultipleContexts", false)), 3)
- SparkConf:()
- SparkEnv:(1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager)
- Driver Programme
1 SparkConf->SparkEnv
2 TaskScheduler
3 dagScheduler.start()
4 ui.start()
- job :
1 SparkContext create instance sc
2 RDD using sc new RDD
3 apply transformation to RDD get a different RDD
4 action action on RDD call sc.runJob
5 sc.runJob→dagScheduler.runJob→submitJob submitJob create an event JobSubmmitted sending to eventProcessActor
6 eventProcessActor.processEvent
7 submitStage job splited into stages, get the dependencies, the final stage, submit the final stage and run
8 submitMissingTasks
9 TaskScheduler::submitTasks assign tasks to workers
10 TaskSchedulerImpl create backend arroding to the mode, backend receive ReceiveOffers from TaskSchedulerImpl
11 eceiveOffers→executor.launchTask→TaskRunner.run
- transformation of RDD
1hadoopRDD->MappedRDD textFile, RDD.scala
2->FlatMappedRDD flatMap, RDD.scala
3 ->MappedRDD splittedText.map, RDD.scala
4->PairRDDFunctions reduceByKey , in /src/PairRDDFunctions.scala
5->ShuffleRDD
6->MapPartitionsRDD
- executor:
1 executorMemory conf.getOption().orElse().getorElse()
taskScheduler
heartbeatReceiver
env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
dagScheduler
2 taskScheduler.start()
3
4
- TaskScheduler: implement scheduling FIFO & FAIR
1 SchedulerBackend
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) ,AppClientListener ,Logging
1 AppClient
2 maxCores
3 start :(1driverUrl, 2args, 3extraJavaOpts, 4classPathEntries, 5libraryPathEntries)
4 sparkJavaOpts, javaOpts, command, appDesc
5 new AppClient
6 client.start() , waitForRegistration()
!!: the conf of client including
1sc.env.actorSystem,masters,
2appName,maxCores,sc.executorMemory,sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)
,command("org.apache.spark.executor.CoarseGrainedExecutorBackend", sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
,args(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}").
2 start() backend.start(), spark.speculation
sc.env.actorSystem.scheduler.schedule(
SPECULATION_INTERVAL milliseconds,SPECULATION_INTERVAL milliseconds) {checkSpeculatableTasks()}
- client.AppClientListener: trait
1 connected to the cluster or temporary disconnection
2 temporary disconnection for master failure
3 unrecoverable Application failure
4 add an Executor
5 remove an Executor
- AppClient val ClientActor register,reregister
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterApplication(appDescription)
1 RegisteredApplication
2 ApplicationRemoved from m, terminate
3 ExecutorAdded
4 ExecutorUpdated wheather executor is complete
5 MasterChanged from new m
6 StopAppClient from AppClient::stop()
- case RegisterApplication
1 logInfo
2 val app
ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores), driver就是AppClient的actor
3 registerApplication(app) save app‘s conf to master
4 persistenceEngine.addApplication(app)
5 schedule()
- schedule
1
→ call or next step :definition or content :()including ()parameter or input tab:explanation &at the same time with ?not sure ??serious problem
→(sth)sth is a condition or parameter !!conclusion or attention
以上是关于spark 01的主要内容,如果未能解决你的问题,请参考以下文章
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段
19 01 11 javascript ?????????????????????(???????????????) ??????????????????????????????(代码片段