Spark---读取csvjson文件

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark---读取csvjson文件相关的知识,希望对你有一定的参考价值。

CSV文件

package file

import org.apache.spark.sql.types.{DoubleType, IntegerType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

/*
* 读取CSV文件
* */

object CSVDemo {
  def main(args: Array[String]): Unit = {
    
    //sparkContext
//    val conf = new SparkConf().setMaster("local[*]").setAppName("CSV")
//    val sc = SparkContext.getOrCreate(conf)
//    val lines = sc.textFile("in/users.csv")

    //第一种加载并去除表头方式
    /*
    val line2 = lines.mapPartitionsWithIndex((index, value) => {
      if (index == 0)
        value.drop(0)   //去除第一行
      else
        value
    })
    line2.map(_.split(","))foreach(x=>println(x.mkString(",")))

    */

    //第二种加载并去表头方式
    /*
    lines.filter(_.startsWith("user_id")== false).map(_.split(",")).foreach(x=>println(x.mkString(",")))
    */

    //sparkSession方式
    val spark = SparkSession.builder().appName("readCSV").master("local[*]").getOrCreate()
    val df:DataFrame = spark.read.format("csv").option("header", "true")
      .load("in/users.csv")
//    df.printSchema()     //查看表头
//    df.show(10)        //只显示10行
//    df.select("user_id","birthyear").show(10)
    val df2 = df.select("user_id","birthyear")
    val df3 = df2.withColumn("birthyear",df2("birthyear").cast(IntegerType))  //类型转换
    df3.printSchema()
    df3.filter(x=>{!x.isNullAt(1) && x.getInt(1) < 1995}).show()
  }
}

JSON文件

package file

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

/*
* JSON文件
* */

object JsonDemo {
  def main(args: Array[String]): Unit = {
    //sparkContext
    /*
    val conf = new SparkConf().setMaster("local[*]").setAppName("json")
    val sc = SparkContext.getOrCreate(conf)
    val lines = sc.textFile("in/users.json")
    import scala.util.parsing.json.JSON   //专门的处理JSON文件
    val rdd = lines.map(x=>JSON.parseFull(x))
    rdd.collect().foreach(println)
    */
    //sparkSession
    val spark = SparkSession.builder().appName("readJSON").master("local[*]").getOrCreate()
    val frame = spark.read.format("json").load("in/user.json")
    frame.printSchema()
    frame.show()
  }
}

以上是关于Spark---读取csvjson文件的主要内容,如果未能解决你的问题,请参考以下文章

黑马程序员《数据清洗》学习笔记CSVJSON数据抽取

在这个 spark 代码片段中 ordering.by 是啥意思?

python+spark程序代码片段

csv转存json,csvjson读写

Spark:仅当路径存在时才读取文件

在spark udf中读取hdfs上的文件