SparkSQL详解
Posted 阿德小仔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL详解相关的知识,希望对你有一定的参考价值。
目录
RDD转换为DataFrame&DataFrame转换为RDD
DataFrame转DataSet&DataSet转DataFrame
SparkSQL概述
什么是Spark SQL
Spark SQL是 Spark 用来处理结构化数据的一个模块,它提供了 2 个编程抽象:DataFrame 和 DataSet,并且作为分布式 SQL 查询引擎的作用。Spark SQL特点
(1)、易整合 将SQL查询与Spark程序无缝混合 (2)、统一的数据访问方式 以相同的方式连接到任何数据源 (3)、兼容 Hive 在现有仓库上运行SQL或HiveQL查询 (4)、 标准的数据连接 通过JDBC或ODBC连接什么是DataFrame
与 RDD 类似,DataFrame 也是一个分布式数据容器。然而 DataFrame 更像传统数据库的二维表 格,除了数据以外,还记录数据的结构信息,即 schema。同时,与 Hive 类似,DataFrame 也支 持嵌套数据类型(struct、array 和 map)。从 API 易用性的角度上看,DataFrame API 提供的是 一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。RDD与DataFrame区别:
例子: users . join ( events , users ( "id" ) === events ( "uid" )). filter ( events ( "date" ) > "2022- 01-01" )什么是DataSet
DataSet是分布式数据集合。DataSet是Spark1.6中添加的一个新抽象,是DataFrame的一个扩 展。它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及SparkSQL优化执行引 擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter等等)。 (1)、是Dataframe API 的一个扩展,是 Spark 最新的数据抽象。 (2)、用户友好的API 风格,既具有类型安全检查也具有 Dataframe 的查询优化特性。 (3)、Dataset 支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效 率。 (4)、样例类被用来在 Dataset 中定义数据的结构信息,样例类中每个属性的名称直接映射到 DataSet 中的字段名称。 (5)、Dataframe 是Dataset 的特列,DataFrame=Dataset[Row] ,所以可以通过 as 方法将 Dataframe转换为 Dataset。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信 息我都用 Row 来表示。(6)、DataSet 是强类型的。比如可以有 Dataset[Car],Dataset[Person]。 (7)、DataFrame 只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在 编译的时候检查是否类型失败的,比如你可以对一个 String 进行减法操作,在执行的时候才报 错,而 DataSet 不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟 JSON 对 象和类对象之间的类比。SparkSQL编程
新的起始点
在老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫 SQLContext,用于 Spark 自己提 供的 SQL 查询;一个HiveContext,用于连接Hive 的查询。 SparkSession 是 Spark 最新的 SQL 查询起始点,实质上是 SQLContext 和 HiveContext 的组 合,所以在 SQLContext 和HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。 SparkSession 内部封装了 sparkContext,所以计算实际上是由 sparkContext 完成的。DataFrame
SparkSQL的DataFrame API允许我们使用DataFrame而不用必须去注册临时表或者生产SQL 表达式。DataFrame API既有transformation操作也有action操作。创建
在 Spark SQL 中 SparkSession 是创建DataFrame 和执行 SQL 的入口,创建 DataFrame 有三种方式: (1)、通过 Spark 的数据源进行创建; (2)、从一个存在的 RDD 进行转换; (3)、还可以从 Hive Table 进行查询返回。 从 Spark 数据源进行创建//1 、查看 Spark 数据源进行创建的文件格式 //spark.read. //2 、读取 json 文件创建 DataFrame // 创建一个 user.json 文件 val df = spark.read.json("/usr/local/soft/data/user.json") //3 、展示结果 df.show df.createTempView("user") spark.sql("select * from user").show注意:如果从内存中获取数据,Spark可以知道数据类型具体是什么。如果是数字,默 认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,索引用bigint接收, 可以和Long类型转换,但是和Int不能进行转换。
- 从一个存在的 RDD 进行转换(后面讲解)
- 还可以从 Hive Table 进行查询返回(后面讲解)
SQL语法
SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有 临时视图或者全局视图来辅助。 (1)、读取JSON文件创建DataFrameval df = spark.read.json("/usr/local/soft/data/user.json")(2)、对DataFrame创建一个临时表
df.createOrReplaceTempView("people")注意:View和table的区别,View是不能修改的,只能查,而table可以增删改查。 (3)、通过SQL语句实现查询全表
spark.sql("select * from people").show(4)、结果展示
df.show注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时 表。使用全局临时表时需要全路径访问,如:global_temp.people
spark.newSession.sql("select * from people").show 报错(5) 、对于 DataFrame 创建一个全局表
df.createOrReplaceGlobalTempView("people") df.createGlobalTempView("people") spark.newSession.sql("select * from global_temp.people").show
DSL语法
DataFrame提供一个特定领域语言(DSL)去管理结构化的数据。可以在Scala、Java、 Python和R中使用DSL,使用DSL语法风格不必去创建临时视图了。 (1)、创建一个DataFrameval df = spark.read.json("/usr/local/soft/data/user.json")(2)、查看DataFrame 的 Schema 信息
df.printSchem(3)、只查看“age”列数据
//df. //tab 键查看 df 中的方法 df.select("age").show()(4)、查看”username”列数据以及”age+1”数据
df.select("age" + 1).show() // 报错 df.select($"age" + 1).show() df.select('age + 1).show() df.select($"username",$"age" + 1).show() df.select($"username",$"age" + 1 as "newage").show() // 取别名注意:涉及到运算的时候,每列都必须使用$,或者采用引号表达式:单引号+字段名 (5)、查看“age”大于“30”的数据
df.filter($"age" > 20).show() df.filter($"age" > 30).show()(6)、按照“age”分组,查看数据条数
df.groupBy("age").count.show()
RDD转换为DataFrame&DataFrame转换为RDD
在IDE中开发时,如果需要 RDD 与DF 或者 DS 之间操作,那么都需要引入 import spark.implicits._ 这里的spark不是scala中的包名,而是创建的sparkSession对象中的变量名称,所以必 须先创建SparkSession对象再导入。这里的spark对象不能使用var声明,因为Scala只 支持val修饰的对象的引入。 注意:spark-shell中无需导入,自动完成此操作。val rdd = sc.makeRDD(List(1,2,3,4)) //rdd. //tab 键查看 rdd 所有的方法 val df = rdd.toDF("id") df.show() //df. //tab 键查看 df 所有的方法 df.rdd
DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息。创建DataSet
(1)、使用样例类序列创建DataSet//case class Perso(name:String,age:Long) val list = List(Perso("zhangsan",30),Person("lisi",20)) val ds = list.toDS ds.show
DataFrame转DataSet&DataSet转DataFrame
//df // 查看是否存在 df //case class Emp(age:Long,username:String) //val ds = df.as[Emp] //ds.show() //ds. //tab 键查看方法 //ds.toDF
RDD转DataSet&DataSet转RDD
val rdd = sc.makeRDD(List(Emp(30,"zhangsan"),Emp(20,"lisi"))) rdd.toDS // 前提条件:必须是样例类把类型准备好 val rdd = sc.makeRDD(List(1,2,3,4)) rdd.toDS val rdd = sc.makeRDD(List(Emp(30,"zhangsan"),Emp(20,"lisi"))) val ds = rdd.toDS val rdd1 = ds.rdd
RDD、DataFrame、DataSet三者的关系
在SarpkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。版本区别
版本来看: Spark1.0 ---->RDD Spark1.3 ---->DataFrame Spark1.6 ---->DataSet 如果同样的数据都给到这三个数据结构,他们分别计算后,都会给出相同的结果。不同的是 他们的执行效率和执行方式。在后期的Spark版本中,DataSet有可能会逐步取代RDD和 DataFrame成为唯一的API接口。三者的共性
1、RDD、DataFrame、Dataset 全都是spark平台下的分布式弹性数据集,为处理超大型数 据提供便利。 2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到 Action如foreach时,三者才会开始遍历运算。 3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存 溢出。 4、三者都有partition的概念。 5、三者有许多共同的函数,如filter,排序等。 6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持:import spark.implicits._。 7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型。三者的区别
1.RDD: (1)、RDD 一般和 spark mlib 同时使用 (2)、RDD 不支持 sparksql 操作 2.DataFrame: (1)、与 RDD 和 Dataset 不同,DataFrame 每一行的类型固定为 Row,每一列的值没法直接 访问,只有通过解析才能获取各个字段的值。 (2)、DataFrame 与 Dataset 一般不与 spark mlib 同时使用。 (3)、DataFrame 与 Dataset 均支持 sparksql 的操作,比如 select,groupby 之类,还能注 册临时表/视窗,进行 sql 语句操作。 (4)、DataFrame 与 Dataset 支持一些特别方便的保存方式,比如保存成 csv,可以带上表 头,这样每一列的字段名一目了然。 3.Dataset: (1)、Dataset 和DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。 (2)、DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些 字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七 条提到的模式匹配拿出特定字段。而 Dataset 中,每一行是什么类型是不一定的,在自定义 了 case class 之后可以很自由的获得每一行的信息。IDEA创建SparkSQL程序
实际开发中,都是使用IDEA中进行开发的。IDEA 中程序的打包和运行方式都和 SparkCore 类似,Maven 依赖中需要添加新的依赖项:<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId> org.apache.spark </groupId> <artifactId> spark-sql_2.12 </artifactId> <version> 2.4.5 </version> </dependency>
package com.shujia.core.sql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame, Dataset, Row, SparkSession
object Demo01_SparkSql
def main(args: Array[String]): Unit =
//TODO 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkSql")
// new SparkSession() //表示没有构造器在这可以访问
//原因:构造方法私有化,所以在外面不能直接访问
val spark: SparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
//builder()构建器构建对象,使用getOrCreate()构建,获取或创建//注意:创建环境的时候需要告知连的是什么,通过config()将配置对象传进去
//在使用DataFrame时,如果设计到转换操作,需要引入转换规则
import spark.implicits._
//TODO 执行逻辑操作
//DataFrame
// val df: DataFrame = spark
// .read
// .json("data/user.json")
// df.show()
//DataFrame =>SQL
// df.createOrReplaceTempView("user")
//
// spark.sql(
// """
// |select * from user
// |""".stripMargin)
// .show()
//
// spark.sql(
// """
// |select age from user
// |""".stripMargin)
// .show()
//
// spark.sql(
// """
// |select avg(age) from user
// |""".stripMargin)
// .show()
//DataFrame =>DSL
//在使用DataFrame时,如果设计到转换操作,需要引入转换规则
// import spark.implicits._
// df.select("age","username").show
// df.select($"age"+ 1 ).show
// df.select('age + 1).show
//TODO DataSet
//DataFrame其实就是特定泛型到DataSet
// val seq: Seq[Int] = Seq(1, 2, 3, 4)
// val ds: Dataset[Int] = seq.toDS()
// ds.show()
//RDD <=> DataFrame
val rdd: RDD[(Int, String, Int)] =
spark.sparkContext.makeRDD(List((1,"zhangsan",20),(2,"lisi",21),
(1,"wangwu",23)))
val df: DataFrame = rdd.toDF("id","name","age")
df.show()
val rdd1: RDD[Row] = df.rddrdd1.foreach(println)
//DataFrame <=> DataSet
val ds: Dataset[User] = df.as[User]
ds.show()
val df1: DataFrame = ds.toDF("id","name","age")
df1.show()
//RDD <=> DataSet
//DataSet:有数据有类型有结构
val ds2: Dataset[User] = rdd.map
case (id, name, age) =>
User(id, name, age)
.toDS()
ds2.show()
val rdd2: RDD[User] = ds2.rdd
rdd2.foreach(println)
//TODO 关闭环境
spark.close()
case class User (id:Int,name:String,age:Int)
用户自定义函数
用户可以通过spark.udf功能添加自定义函数,实现自定义功能。 UDF1、创建DataFrame
package com.shujia.core.sql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame, Dataset, Row, SparkSession
object Demo02_SparkSql_UDF
def main(args: Array[String]): Unit =
//TODO 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkSql")
val spark: SparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//TODO 执行逻辑操作
val df: DataFrame = spark.read.json("data/user.json")
df.createOrReplaceTempView("user")// spark.sql(
// """
// |select age,"name" + username from user
// |""".stripMargin)
// .show()
spark.udf.register("prefixName",(name:String) =>
"name:"+ name
)
spark.sql(
"""
|select age,prefixName(username) from user
|""".stripMargin)
.show()
//TODO 关闭环境
spark.close()
强类型的DataSet和弱类型的DataFrame都提供了相关的聚合函数,如count(),
countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继
承UserDefinedAggregateFunction来实现用户自定义弱类型聚合函数。从Spark3.0版本后
UserDefinedAggregateFunction已经不推荐使用了,可以统一采用强类型聚合函数
Aggregator。
2、用户自定义聚合函数(弱类型)
package com.shujia.core.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.MutableAggregationBuffer,
UserDefinedAggregateFunction
import org.apache.spark.sql.types.DataType, LongType, StructField,
StructType
import org.apache.spark.sql.DataFrame, Row, SparkSession
object Demo03_SparkSql_UDAF
def main(args: Array[String]): Unit = //TODO 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkSql")
val spark: SparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
//TODO 执行逻辑操作
val df: DataFrame = spark.read.json("data/user.json")
df.createOrReplaceTempView("user")
spark.udf.register("ageAvg",new MyAvgUDAF)
spark.sql(
"""
|select ageAvg(age) from user
|""".stripMargin)
.show()
//TODO 关闭环境
spark.close()
/**
* 自定义聚合函数类:计算年龄的平均值
* 1、继承UserDefinedAggregateFunction
* 2、重写方法(8)
*
*/
class MyAvgUDAF extends UserDefinedAggregateFunction
//输入数据的结构 IN
override def inputSchema: StructType =
StructType(
Array(
StructField("age",LongType)
)
)
//缓冲区的结构 Buffer
override def bufferSchema: StructType =
StructType(
Array(
//所有年龄的和
StructField("total",LongType) ,
//所有年龄出现的次数
StructField("count",LongType)
)
)
//函数计算结果的数据类型:Out
override def dataType: DataType = LongType//函数的稳定性
override def deterministic: Boolean = true
//缓冲区初始化
override def initialize(buffer: MutableAggregationBuffer): Unit =
buffer.update(0,0L)
buffer.update(1,0L)
//根据输入的值更新缓冲区数据
override def update(buffer: MutableAggregationBuffer, input: Row):
Unit =
//buffer.getLong(0)缓冲区之前的值 + input.getLong(0)输入的值
buffer.update(0,buffer.getLong(0) + input.getLong(0))
buffer.update(1,buffer.getLong(1) + 1)
//缓冲区数据合并
override def merge(buffer1: MutableAggregationBuffer, buffer2:
Row): Unit =
//x y
//1 2 3 4
//x+y===>x
//x y
//3 3 4
//x+y===>x
//x y
//6 4
//x+y===>x
buffer1.update(0,buffer1.getLong(0) + buffer2.getLong(0))
buffer1.update(1,buffer1.getLong(1) + buffer2.getLong(1))
//计算平均值
override def evaluate(buffer: Row): Any =
buffer.getLong(0) / buffer.getLong(1)
(用户自定义聚合函数(Spark3.0.0以下版本的强类型)
package com.shujia.core.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator,
MutableAggregationBuffer, UserDefinedAggregateFunction
import org.apache.spark.sql.types.DataType, LongType, StructField,
StructType
import org.apache.spark.sql.DataFrame, Dataset, Encoder, Encoders,
Row, SparkSession, TypedColumn
object Demo03_SparkSql_UDAF2 def main(args: Array[String]): Unit =
//TODO 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkSql")
val spark: SparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//TODO 执行逻辑操作
val df: DataFrame = spark.read.json("data/user.json")
//在Spark3.0.0以下的版本,也就是我们所用的版本是不能在sql中使用强类型UDAF操作
//SQL & DSL
//Spark3.0.0以下的版本,强类型UDAF聚合函数使用DSL语法操作
val ds: Dataset[User] = df.as[User]
//将UDAF函数转为查询的列对象
val udaf: TypedColumn[User, Long] = new MyAvgUDAF().toColumn
//查询
ds.select(udaf).show()
//TODO 关闭环境
spark.close()
/**
* 自定义聚合函数类:计算年龄的平均值
* 1、继承UserDefinedAggregateFunction
* 2、重写方法(8)
*
*/
case class User(username:String,age:BigInt)
/**
* 自定义聚合函数类:计算年龄的平均值
* 1、继承 org.apache.spark.sql.expressions.Aggregator,定义泛型
* IN:输入的数据类型 User
* BUF:缓冲区的数据类型
* OUT:输出的数据类型 Long
* 2、重写方法(6)
*
*/
case class Buff(var total:Long,var count:Long)
class MyAvgUDAF extends Aggregator[User,Buff,Long]
// z & zero都为初始值或零值
//缓冲区的初始化
override def zero: Buff =
Buff(0L,0L)
//根据输入的数据更新缓冲区数据override def reduce(buff: Buff, in: User): Buff =
buff.total = buff.total + in.age.toInt
buff.count = buff.count + 1
buff
//合并缓冲区
override def merge(buff1: Buff, buff2: Buff): Buff =
buff1.total = buff1.total + buff2.total
buff1.count = buff1.count + buff2.count
buff1
//计算结果
override def finish(buff: Buff): Long =
buff.total / buff.count
//缓冲区的编码操作
override def bufferEncoder: Encoder[Buff] = Encoders.product
//输出的编码操作
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
用户自定义聚合函数(spark3.0.0以上版本的强类型)
注意:
spark3.0.0
一下没有
functions.udaf
,想要使用需要去修改
pom
文件中
spark
版本。
package com.shujia.core.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.DataFrame, Encoder, Encoders,
SparkSession, functions
object Demo04_SparkSql_UDAF1
def main(args: Array[String]): Unit =
//TODO 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkSql")
val spark: SparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//TODO 执行逻辑操作
val df: DataFrame = spark.read.json("data/user.json")
df.createOrReplaceTempView("user")spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF))
spark.sql(
"""
|select ageAvg(age) from user
|""".stripMargin)
.show()
//TODO 关闭环境
spark.close()
/**
* 自定义聚合函数类:计算年龄的平均值
* 1、继承 org.apache.spark.sql.expressions.Aggregator,定义泛型
* IN:输入的数据类型 Long
* BUF:缓冲区的数据类型
* OUT:输出的数据类型 Long
* 2、重写方法(6)
*
*/
case class Buff(var total:Long,var count:Long)
class MyAvgUDAF extends Aggregator[Long,Buff,Long]
// z & zero都为初始值或零值
//缓冲区的初始化
override def zero: Buff =
Buff(0L,0L)
//根据输入的数据更新缓冲区数据
override def reduce(buff: Buff, in: Long): Buff =
buff.total = buff.total + in
buff.count = buff.count + 1
buff
//合并缓冲区
override def merge(buff1: Buff, buff2: Buff): Buff =
buff1.total = buff1.total + buff2.total
buff1.count = buff1.count + buff2.count
buff1
//计算结果
override def finish(buff: Buff): Long =
buff.total / buff.count
//缓冲区的编码操作
override def bufferEncoder: Encoder[Buff] = Encoders.product//输出的编码操作
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
数据的加载和保存
通用的加载和保存方式
SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根 据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式为 parquet。 (1)加载数据spark.read.load 是加载数据的通用方法 spark . read . //tab 键查看方法如果读取不同格式的数据,可以对不同的数据格式进行设定。
spark . read . format ( "..." )[. option ( "..." )]. load ( "..." )1、format("..."):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。 2、load("..."):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。 3、option("..."):在"jdbc"格式下需要传入JDBC相应的参数,url、user、password和dbtable我们前面都适用read API先把文件加载到DataFrame然后再查询,其实我们也可以直接在文件上进行查询:
// 文件格式 .` 文件路径 ` // 上传 user.json 到 HDFS 上 //hadoop fs -put user.json /data spark . sql ( "select * from json.`data/user.json`" ). show(2) 保存数据
df.write.save 是保存数据的通用方法 df . write . //tab 键查看方法 df . write . format ( "json" ). save ( "/data/output1" )如果保存不同格式的数据,可以对不同的数据格式进行设定。
df . write . format ( "..." )[. option ( "..." )]. save ( "..." )1、format("..."):指定加载的数据类型,包 括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。 2、load("..."):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传 入加载数据的路径。 3、option("..."):在"jdbc"格式下需要传入JDBC相应的参数,url、user、password和 dbtable保存操作可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设 置。 注意:这些SaveMode都是没有加锁的,也不是原子操作。 SaveMode是一个枚举类,其中的常量包括:
df . write . mode ( "append" ). json ( "data/output1" ) df . write . mode ( "overwrite" ). json ( "data/output1" ) df . write . mode ( "ignore" ). json ( "data/output1" )
parquet
SparkSQL的默认数据源为parquet格式。parquet是一种能够有效存储嵌套数据的列式存储 格式。 数据源为parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format。修改 配置项spark.sql.sources.default,可以修改默认数据源格式。 (1)加载数据// 上传文件到 HDFS //hadoop fs -put users.parquet /data val df = spark . read . load ( "/data/users.parquet" ) df . show(2)保存数据
val df = spark . read . json ( "data/user.json" ) df . write . mode ( "append" ). save ( "/data/output2" )
JSON
SparkSQL能够自动推测JSON数据集的结构,并将它加载为一个DataSet[ROW]。可以通过 SparkSession.read.json()去加载JSON文件。 注意:Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。格式如 下:"name" : "zhangsan" "name" : "lisi" , "age" : 21 "name" : "wangwu" , "age" : 24 , "name" : "zhaoliu" , "age" : 23(1)导入隐式转换
import spark . implicits . _(2)加载JSON文件
val path = "data/user.json" val userDF = spark . read . json ( path )(3)创建临时表
userDF . createOrReplaceTempView ( "user" )(4)数据查询
spark . sql ( "select * from user" ). show
CSV
SparkSQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列 。// 上传文件到 HDFS //hadoop fs -put people.csv /data val df_csv = spark . read . format ( "csv" ). option ( "seq" , ";" ). option ( "inferSchema" , "true" ) . option ( "header" , "true" ). load ( "/data/people.csv" )
MySQL
SparkSQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对 DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。如果使用spark-shell操 作,可以在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到Spark的类 路径下 。bin / spark - shell -- jars mysql - connector - java - 5.1 . 27 - bin . jar我们在IDEA中通过JDBC对Mysql进行操作。 (1)导入依赖
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId> mysql </groupId> <artifactId> mysql-connector-java </artifactId> <version> 5.1.49 </version> </dependency>(2)读取数据
package com.shujia.core.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql._
object Demo05_SparkSql_JDBC
def main(args: Array[String]): Unit =
//TODO 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkSql")
val spark: SparkSession =
SparkSession.builder().config(sparkConf).getOrCreate()
//TODO 执行逻辑操作
//读取mysql数据
val df: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://master:3306/dtt_data")
//注意mysql5.7用:com.mysql.jdbc.Driver
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "student")
.load()
// df.show()
df.write
.format("jdbc")
.option("url","jdbc:mysql://master:3306/dtt_data")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("password","123456")
.option("dbtable","student_1")
.mode(SaveMode.Append)
.save()
//TODO 关闭环境
spark.close()
Hive
ApacheHive是Hadoop上的SQL引擎,SparkSQL编译时可以包含Hive支持,也可以不包含。 包含Hive支持的SparkSQL可以支持Hive访问、UDF(用户自定义函数)以及 Hive 查询 语言 (HiveQL/HQ以上是关于SparkSQL详解的主要内容,如果未能解决你的问题,请参考以下文章