spark sc.textFile() 指定换行符

Posted jason-dong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark sc.textFile() 指定换行符相关的知识,希望对你有一定的参考价值。

直接上代码

package com.jason.spark23

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

object WriteTest {
  implicit class ContextExtensions(val sc: SparkContext) extends AnyVal {
    def textFile(
                  path: String,
                  delimiter: String,
                  maxRecordLength: String = "1000000"
                ): RDD[String] = {

      val conf = new Configuration(sc.hadoopConfiguration)

      // This configuration sets the record delimiter:
      conf.set("textinputformat.record.delimiter", delimiter)
      // and this one limits the size of one record:
      conf.set("mapreduce.input.linerecordreader.line.maxlength", maxRecordLength)

      sc.newAPIHadoopFile(
        path,
        classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
        conf
      )
        .map { case (_, text) => text.toString }
    }
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("readtest")
      .master("local")
      .getOrCreate()
    import spark.implicits._
    /*val pathjson = "C:\notos\code\sparktest\src\main\resources\employees.json"
    println("====json df") //jsondf 会自动给schema设置类型
    val jsonDf = spark.read.json(pathjson)
    jsonDf.show()
    //jsonDf.write.format("text").save("C:\notos\code\sparktest\src\main\resources\text")
    jsonDf.rdd.saveAsTextFile("")*/

    val pathtxt = "C:\notos\code\sparktest\src\main\resources\people2.txt"
    val dd = spark.read.option("textinputformat.record.delimiter","||").format("text").load(pathtxt)
    dd.show()
    dd.rdd.collect.foreach(println)
    val sc = spark.sparkContext
    val people2 = sc.textFile(pathtxt,"||")
    people2.collect().foreach(println)
    spark.stop()
  }
}

这里使用了scala 中的隐式转换,当调用sc.textFile(path,delimiter)时 sc会被自动包装成ContextExtensions ,并调用其textFile 方法

以上是关于spark sc.textFile() 指定换行符的主要内容,如果未能解决你的问题,请参考以下文章

Spark RDD 操作实战之文件读取

spark配置

spark 可以直接向hdfs 输入数据吗

Spark运行流程

Spark 使用 sc.textFile("s3a://bucket/filePath") 读取 s3。 java.lang.NoSuchMethodError:com.amazo

spark简单例子