val rdd = sc.parallelize(List(1,2,3,4,5,6),第二参数) 这里的第二参数 获取方式有两种: 1.直接给定值,根据传入的值决定分区的数量 2.根据运行环境获取分区数量(core) -->例如 本地运行 设置为local 此时设置分区值默认分区就是1个
val rdd = sc.textFile(path: String, minPartitions: Int = defaultMinPartitions) 读取文件中内容算子中有两个参数 第一个参数是获取数据路径 这个理第二个参数,第二参数决定了分区的数量有两种情况 1.在不传递值的情况,使用是默认defaultMinPartitions --> 这个值时多少? 2.在传递分区数量是时候,这个分区值是多少
第一条主线 --> defaultMinParititons值时多少? 1.先从textFile这个算子入手,进入后台源码 def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) } 2.在不传入分区数值的情况下,默认textFile中使用了一个值defaultMinPartitions,这个值就决定了分区数量,查看这个值 def defaultMinPartitions: Int = math.min(defaultParallelism, 2) 发现defaultMinPartitions,并不是一个值而是一个方法,在这个方法中实现是一个math比较最小值 这个比较中有一共值时固定是 2 这个值,和2比较时有一个全新的参数defaultParallelism,需要查看这个参数
3.继续拆安defaultParallelism这个值的时候发现他也是一个方法 def defaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism } 方法最后一句是整个方法的返回一直,也就是说这个方法获取值,是最后一句产生,这个产生值还触发了一个TaskScheduler(任务调度),此时defaultParallelism 当查看这个方法的时候发现这个方法并没有实现体,这个方法是存在在特质中 def defaultParallelism(): Int ctrl+alt+ 左右 回到之前调用或下一次调用(必须知道实现者是谁) 在触发抽方法的位置 --> ctrl+atl+鼠标左键-->就可以查看实现这个方法或触发这个方法的类 发现这个抽象方法实现类 --> TaskSchedulerImpl在这个类中有方法的实现
4.TaskSchedulerImpl在这个类中有方法的实现 override def defaultParallelism(): Int = backend.defaultParallelism() 发现原来抽象方法已经被重写了,并且有一个实现,此时只需要触发defaultParallelism就可以触发出这个值多少了 但是,点击查看后发现也是一个抽象方法defaultParallelism() --> 对这个实现在此查询实现者即可
5.查看defaultParallelism() 此时发现实现方式有两种 1.CoarseGrainedSchedulerBackend spark集群模式 2.LocalSchedulerBackend 本地模式 我们查看是集群模式,结果发现了 override def defaultParallelism(): Int = { conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } 这个方法中比较的是最大值,其中第一个采纳数是totalCoreCount即集群核心数 第二个参数固是2
结论: 在调用textFile算子的时候,初始默认分区数量是2,除非小于2,否则默认分区数量就是2个
第二条主线 --> 查看分区计算流程 问题:先阶段已经知道分区数量默认是2个分区,具体分区中计算方式时候什么样式?(分片逻辑)
1.还是在textFile这个算子实现中 已经知道分区数量之后,查看内部对分区数量的使用,需要查看方法的实现 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) 分区参数传入到一个叫做 hadoopFile中,所以此时就需要查看hadoopFile是谁
2.查看hadoopFile def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. FileSystem.getLocal(hadoopConfiguration)
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) //核心出现在这个位置,这里创建了一个HadoopRDD对象 new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
3.查看HadoopRDD中存在哪些操作? 在这里类中 override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } getPartitions是切片方法的触发 val inputSplits = inputFormat.getSplits(jobConf, minPartitions) 这个方法是具体的切分 val array = new Array[Partition](inputSplits.size) 就是获取分片个数 需要查看getSplits 4.查看 getSplits方法 这个方法是接口中抽象方法,此时需要使用 ctrl+atl+鼠标左边 查看这个方法的实现 一般处理数据方式都是 FileInputFormat类中查看 getSplits方法 这个方法和MR中切片放啊其实逻辑是一样的,核心位置 long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } // totalSize 获取文件的大小
Spark中和MR中切片最大的不同位置出现了,Spark会计算切片大小 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 启动numSplits就是之前minPartitions即 默认分区值
最终切片的位置依旧保留着MR中思想即 1.1冗余 long splitSize = computeSplitSize(goalSize, minSize, blockSize); //这里会计算真正切片的大小
long bytesRemaining = length; //文件大小 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) //切片逻辑 //切完一片之后 会减去切片大小 bytesRemaining -= splitSize;
总结:分区数量其实是可以影响最最终文件的个数,但是在最终输出界过之前,会执行分片处理,这个分片才是最终输出分区的个数,我们若需要影响最终输出值,此时可以在最终输出算子之前调用 repartition 来修改分区
|