sparkSQL创建DataFrame

Posted zzhangyuhang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparkSQL创建DataFrame相关的知识,希望对你有一定的参考价值。

首先我们要创建SparkSession

val spark = SparkSession.builder()
                        .appName("test")
                        .master("local")
                        .getOrCreate()
import spark.implicits._ //将RDD转化成为DataFrame并支持SQL操作        

然后我们通过SparkSession来创建DataFrame

1.使用toDF函数创建DataFrame

 通过导入(importing)spark.implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。

 只要这些数据的内容能指定数据类型即可。

import spark.implicits._
val df = Seq(
  (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
  (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
).toDF("id", "name", "created_time")

注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2"

可以通过df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")进行修改列名

2.使用createDataFrame函数创建DataFrame

通过schema + row 来创建

我们可以通俗的理解为schema为表的表头,row为表的数据记录

import org.apache.spark.sql.types._
//定义dataframe的结构的schema
val schema = StructType(List(
    StructField("id", IntegerType, nullable = false),
    StructField("name", StringType, nullable = true),
    StructField("create_time", DateType, nullable = true)
))
//定义dataframe内容的rdd
val rdd = sc.parallelize(Seq(
  Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
  Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
))
//创建dataframe
val df = spark.createDataFrame(rdd, schema)

不过,我们可以把文件结构当做参数来使用,通过rdd自动产生schema和row,不用自己手动生成。

import org.apache.spark.sql.types._

//传入属性参数
val schemaString = " id name create_time" 
//解析参数变成StructField
val fields = schemaString.split(" ")
                         .map(fieldName => StructField(fieldname, StringType, nullable = true))
//定义dataframe的结构的schema
val schema = StructType(fields)

//定义dataframe内容的rdd
val lines = sc.textFile("file:///people.txt")
val rdd = lines.spilt(_.split(","))
               .map(attributes=>ROW(attributes(0),attributes(1).trim) )

//创建dataframe
val df = spark.createDataFrame(rdd, schema)       

3.通过反射机制创建DataFrame

首先要定义一个case class,因为只有case class才能被Spark隐式转化为DataFrame

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._
//创建匹配类
case class Person(id:Int,name:String,age:Long)
//读取文件生成rdd
val rdd = sc.textFile("file:///")
//通过匹配类把rdd转化成dataframe
val df = rdd.map(_.split(","))
            .map(attributes => Person(attributes(0),attributes(1),attributes(2).trim.toInt)) .toDF()  

4.通过文件直接创建DataFrame

 (1)使用parquet文件read创建  

val df = spark.read.parquet("hdfs:/path/to/file")

 (2)使用json文件read创建

val df = spark.read.json("examples/src/main/resources/people.json")

 (3)使用csv文件load创建

val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path")

 (4)使用Hive表创建

spark.table("test.person") // 库名.表名 的格式
     .registerTempTable("person")  // 注册成临时表
spark.sql(
      """
        | select *
        | from person
        | limit 10
      """.stripMargin).show()

记得,最后我们要调用spark.stop()来关闭SparkSession。  

5.保存

 (1)通过df.write.format().save("file:///")保存

  write.format()支持输出的格式有 JSON、parquet、JDBC、orc、csv、text等文件格式

  ,save()定义保存的位置

  当我们保存成功后可以在保存位置的目录下看到文件,但是这个文件并不是一个文件而是一个目录。

  里面的内容一般为

  

  不用担心,这是没错的。

  我们读取的时候,并不需要使用文件夹里面的part-xxxx文件,直接读取目录即可。

 (2)通过df.rdd.saveAsTextFile("file:///")转化成rdd再保存

我们对于不同格式的文件读写来说,我们一般使用两套对应方式

val df = spark.read.格式("file:///")//读取文件
df.write.格式("file:///")//保存文件
val df = spark.read.format("").load("file:///")//读取文件
df.write.save("file:///")//保存文件

具体read和load方法有什么不同,我还不是很清楚,弄明白了回来补充。

6.通过JDBC创建DataFrame 

我们在启动Spark-shell或者提交任务的时候需要添加相应的jar包

spark-shell(spark-submit)

--jars /usr/local/spark/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \\

--driver-class-path /usr/local/spark/mysql-connector-java-5.1.40-bin.jar

val jdbcDf = spark.read.format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")   //驱动
    .option("url", "jdbc:mysql://ip:3306")  //数据库地址
    .option("dbtable", "db.user_test") //表名:数据库名.表名
    .option("user", "test") //用户名
    .option("password", "123456")  //密码
    .load()
jdbcDf.show()

 

以上是关于sparkSQL创建DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL数据源

SparkSQL 核心编程

SparkSQL详解

回顾IDEA 开发 SparkSQL 基础编程

SparkSQL

Spark-SQL——DataFrame与Dataset