这里记录pyspark的执行逻辑图
代码版本为 spark 2.2.0
1.执行逻辑
这里简述一下代码调用流程
1. 用户通过spark-submit提交python代码,spark-submit检测到此次提交任务类型是python类型则会设置mainClass为PythonRunner
2. PythonRunner 会负责(通过process的方式)执行用户的python代码,并且启动py4j的gatewayServer 把启动端口通过环境变量通告到python进程中
3. 用户代码启动sparkContext 通过py4j 初始化jvm中的sparkContext
4. jvm中的sparkContext开始执行,将dagScheduler,taskScheduler等初始化,启动executor等角色
5. 用户代码执行rdd的action(pyspark的action都是jvm中collect触发的)动作,runJob开始真正执行任务,通过对应的Task开始PythonRDD的执行
6. executor执行PythonRDD的compute,compute中创建对应的python进程执行pyspark.daemon,并且建立socket用于跟python进行数据传输
7. socket建立完成之后启动writerThread线程将PythonRDD中从jvm中产生的父RDD开始执行,将数据写入到socket,写入完成后socket.flush
8. pyspark.daemon 调用pyspark.work 从socket中读取要执行的python函数和数据,开始真正的数据处理逻辑
9. 数据处理完成之后将处理结果写回socket,jvm中通过PythonRDD的read方法读取,并返回结果
10. 最终executor将PythonRDD的执行结果上报到drive上,返回给用户
具体执行逻辑图和框架说明看这个博客整理的内容,其中逻辑图画的很明确,这里不再赘述,直接引用他的链接
2.计算代码到执行逻辑的映射
rdd = sc.textFile(....)
#rdd = RDD(_jrdd = HadoopRDD)
#生成的RDD是通过py4j调用jvm中对应的方法产生的rdd,并且用python 代码中的RDD进行包装而成
#最初的RDD是数据源的RDD,此次是HadoopRDD,这个hadoopRDD最终在上一小节第7步中被执行
rdd1 = rdd.map(lambda x: x.split(" "))
#rdd1 = PipelinedRDD(_jrdd = PythonRDD(parent=HadoopRDD,PythonFunction(lambda)))
#此处的rdd是python中PipelinedRDD对上一个生成的RDD和用户提供的方法进行整合的RDD,类型是jvm中的PythonRDD
#PipelinedRDD将父rdd进行传递,保证数据源的RDD始终都是新RDD的父RDD
rdd2 = rdd1.flatMap(lambda x: [(i,1) for i in x])
#rdd2 = PipelinedRDD(_jrdd = PythonRDD(parent=HadoopRDD,PythonFunction(func(_,lambda))))
rdd3 = rdd2.reduceByKey(lambda x,y: x + y)
#rdd3 = PipelinedRDD(_jrdd = PythonRDD(parent=HadoopRDD,PythonFunction(mergeCombiners)))
#无论进行多少次PipelinedRDD的转换,始终都是对应在PythonRDD上,将父rdd传递到此,将方法一层一层嵌套传进来
#保证最重PythonRDD被调用的时候父rdd是数据源rdd,pythonFunction是用户的所有处理逻辑
rdd4 = rdd3.collect()
#rdd4 = [(word,Int)]
#此处生成的最终的结果,在collect的时候调用jvm中对应的PythonRDD.collectAndServer开始提交任务执行
#任务执行结果被传回jvm,上报到drive,最重返回给用户