Spark源码解读--spark.textFile()读取流程

Posted 大数据小知识

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark源码解读--spark.textFile()读取流程相关的知识,希望对你有一定的参考价值。

  • 大家都知道RDD的算子分两类:
    Transform就是RDD的转换,从一个RDD转化到另一个RDD(也有多个的情况)。
    Action则是出发实际的执行动作。
  • 所以解读spark.textFile()源码将不涉及Action操作,即,仅仅会记录一些RDD信息
    val sparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName)
      .setMaster("local[2]")

    val sc = SparkContext.getOrCreate(sparkConf)

    val rdd = sc.textFile("inputdata/*")
    //或者可设置分区值 spark 分区数 依据其两者之间最大的分区数来计算出
    //val rdd = sc.textFile("inputdata/*",6)
    
    //val collect = rdd.collect()
  • 第一步:textFile方法将会调用 hadoopFile方法
  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   * @param path path to the text file on a supported file system
   * @param minPartitions suggested minimum number of partitions for the resulting RDD
   * @return RDD of lines of the text file
   */
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }
  • 第二步: 在hadoopFile中,我们发现 最终仅仅new HadoopRDD,就结束 返回了;
  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()

    // This is a hack to enforce loading hdfs-site.xml.
    // See SPARK-11227 for details.
    FileSystem.getLocal(hadoopConfiguration)

    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }
  • 第三步: 当执行runjob任务时 可发现,会依据传入的分区数或者用默认的最小分区数2 来split
  override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    try {
        //获取分区 文件所在块位置 大小
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      case e: InvalidInputException if ignoreMissingFiles =>
        logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
            s" partitions returned from this path.", e)
        Array.empty[Partition]
    }
  }
  • 第三步: getSplits 方法
  • org.apache.hadoop.mapred.FileInputFormat#getSplits

  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    StopWatch sw = new StopWatch().start();
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }
        依据文件大小进行Splits
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
    ......
    
     long splitSize = computeSplitSize(goalSize, minSize, blockSize);

第四步: compute 使用 RecordReader 行读取 LineRecordReader 获取文本数据

 override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {

    ......
    
    reader =
        try {
          inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
    ......
    //判断是否读取完数据
     override def getNext(): (K, V) = {
        try {
          finished = !reader.next(key, value)
        } catch {
          case e: FileNotFoundException if ignoreMissingFiles =>
            logWarning(s"Skipped missing file: ${split.inputSplit}", e)
            finished = true
          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
          case e: FileNotFoundException if !ignoreMissingFiles => throw e
          case e: IOException if ignoreCorruptFiles =>
            logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
            finished = true
        }
        if (!finished) {
          inputMetrics.incRecordsRead(1)
        }
        if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
          updateBytesRead()
        }
        (key, value)
      }
    
    //  inputFormat.getRecordReader调用的方法
    
 public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit, JobConf job,
                                          Reporter reporter)
    throws IOException {
    
    reporter.setStatus(genericSplit.toString());
    String delimiter = job.get("textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter) {
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    }
    return new LineRecordReader(job, (FileSplit) genericSplit,
        recordDelimiterBytes);
  }


以上是关于Spark源码解读--spark.textFile()读取流程的主要内容,如果未能解决你的问题,请参考以下文章

spark textFile 困惑与解释

(python) Spark .textFile(s3://...) access denied 403 with valid credentials

Spark-1.6.0中的Sort Based Shuffle源码解读

Spark学习之路 (十六)SparkCore的源码解读spark-submit提交脚本

Spark学习之路 (十六)SparkCore的源码解读spark-submit提交脚本

Spark学习之路 (十六)SparkCore的源码解读spark-submit提交脚本[转]