Spark-SQL——DataFrame与Dataset
Posted Xsqone
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-SQL——DataFrame与Dataset相关的知识,希望对你有一定的参考价值。
文章目录
一、Spark SQL概述
1.1、Spark SQL是什么?
Spark SQL是Apache Spark中用于结构化数据处理的模块。它允许开发人员在Spark上执行SQL查询、处理结构化数据以及将它们与常规的RDD一起使用。Spark Sql提供了用于处理结构化数据的高级API,如DataFrames和Datasets,它们比原始的RDD API更加高效和方便。
通过Spark SQL,可以使用标准的SQL语言进行数据处理,也可以使用DataFrame API进行操作。此外,Spark SQL还可以集成Hive,允许用户直接查询Hive表并将查询结果存储到Hive中。Spark SQL也支持Spark中运行标准的JDBC/ODBC连接,从而可以使用各种外部数据源。
1.2、Hive和Spark SQL
Hive和Spark SQL都是用来处理大数据的工具,主要是基于Hadoop生态圈。它们的相同点都是用来查询和处理大规模数据的,而且都可以使用类SQL语言来进行操作。
不同之处:
- 操作语言不同:Hive使用HQL(Hive Query Language)进行数据操作,而Spark SQL使用Spark SQL语法进行数据操作。
- 数据处理方式不同:Hive依赖于MapReduce作为计算引擎,而Spark SQL使用Spark作为计算引擎。由于Spark的内存计算能力较强,所以在某些场景下,Spark SQL比Hive更加高效。
- 数据存储方式不同:Hive使用HDFS或者其他支持Hadoop HDFS API的存储系统来存储数据,而Spark SQL可以支持多种不同的数据存储系统,例如:HDFS、Hive等。
- 性能不同:Spark SQL的性能要比Hive快得多,主要是因为Spark Sql使用了内存计算技术,而Hive使用的是MapReduce计算模型。
- 执行计划不同:Hive的执行计划是通过HQL生成的,而Spark Sql的执行计划是通过Spark的优化器生成的。Spark的优化器可以对查询进行优化,以提高查询的性能。
1.3、DataFrame与DataSet
- DataFrame
- DataFrame是Spark SQL的一种数据抽象,它表示分布式数据集合。DataFrame和关系型数据库中的表类似,都有列和行的概念,而且还具备了分布式的特性。DataFrame提供了丰富的数据操作接口,例如:选择、过滤、分组、聚合、排序、连接等。这些操作和SQL语句类似,便于使用和理解。
- DataFrame可以看作是由一系列Row对象组成的RDD,每个Row代表着一行数据。不同于RDD,DataFrame具有数据类型的概念,这意味着DataFrame的每一列都有其对应的数据类型,这一特性使得Spark能够进行更为智能的优化,例如通过列式存储来提升查询效率。同时DataFrame的API使用了强类型语言的特性,使得在编译期就能够捕获到许多错误,提高了开发效率。
- Dataset
- Dataset是Spark中的一种强类型API,它结合了Spark SQL中的DataFrame和RDD的优点。
- Dataset API提供了编译时类型安全和面向对象的编程接口,并利用Spark SQL的优化执行引擎来提高查询性能。它可以使用Scala、Java和Python等语言进行操作,支持多种数据源,如Parquet、JSON、JDBC等。
- Dataset API还提供了对关系型数据和面向对象数据的支持,可将其转换为Dataset,通过统一的编程接口进行处理和操作。
- 二者不同:
- 数据类型:DataFrame 是一种以列为基础的分布式数据集合,强调数据的结构,即 schema,可以使用 SQL 或者 DataFrame 的 API 进行操作;Dataset 是一种类型化的分布式数据集合,强调类型的安全性,可以使用强类型的 API 进行操作。
- 类型检查:DataFrame 只有在运行时才会进行类型检查,可能在运行时出现类型错误;Dataset 利用 Spark 强大的编译器来捕捉类型错误,可以在编译时就发现类型错误。
- API:DataFrame 的 API 是基于 DataFrame 和 SQL 的 API,使用较为灵活,但是对于类型的检查较为弱;Dataset 的 API 是基于 Scala 和 Java 的编程语言,可以保证类型的安全性,但是使用的时候会相对复杂一些。
- 编译时优化:Dataset 可以受益于 Spark 的 Catalyst 查询优化器,在编译时进行优化,可以提高性能;DataFrame 只有在运行时才能够获得优化。
- 语言支持:DataFrame 可以使用 Scala,Java,Python 和 R 等编程语言,比 Dataset 支持的编程语言更多。Dataset 只支持 Scala 和 Java。
- 性能:由于 Dataset 强制类型检查,所以可以获得更好的性能,因此在需要高性能的场景下推荐使用 Dataset。但是对于某些特定场景下,DataFrame 可能会更适合。
二、Spark SQL编程
Spark Core 中,如果想要执行应用程序,需要首先构建上下文环境对象 SparkContext,Spark SQL 其实可以理解为对 Spark Core 的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
SparkSession 是 Spark 最新的 SQL 查询起始点,实质上是 SQLContext 和 HiveContext的组合,所以在 SQLContext 和 HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。当我们使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark 的 SparkSession 对象, 就像我们以前可以自动获取到一个 sc 来表示 SparkContext 对象一样
2.1、DataFrame
2.1.1、创建DataFrame
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从HiveTable进行查询返回。
- 从Spark数据源进行创建
- Spark支持的创建文件的数据源格式:
- 创建文件user.json文件
“username”:“zhangsan”,“age”:20 - 读取json文件创建DataFrame
scala> spark.read.json("file:///opt/stufile/user.json") res2: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
- 结果
scala> res2.show() +---+--------+ |age|username| +---+--------+ | 20|zhangsan| +---+--------+
- Spark支持的创建文件的数据源格式:
- RDD与DataFrame互转
- 在IDEA中开发程序时,如果需要RDD与DF或者DS之间进行互相操作,那么需要引入 import spark.implicits._
- 在spark-shell中无需导入,自动完成此操作
- 创建样例类
scala> case class User(name:String,age:Int) defined class User
- 创建RDD
sc.makeRDD(List(("zhangsan",30),("lisi",20))) res4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at makeRDD at <console>:25
- 转换DataFrame
scala> res5.map(t => User(t._1,t._2)).toDF res7: org.apache.spark.sql.DataFrame = [name: string, age: int] scala> res7.show +--------+---+ | name|age| +--------+---+ |zhangsan| 30| | lisi| 20| +--------+---+
- 转换为RDD
scala> res7.rdd res9: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[21] at rdd at <console>:26
2.1.2、SQL语法
SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。
df.createOrReplaceTempView(“people”) 创建临时表(创建或重写临时表)
df.createOrReplaceGlobalTempView(“people”) 创建全局表(创建或重写全局表)
**注意:**普通临时表是Session范围内的,如果想要应用范围内有效,可以使用全局临时表。使用全局临时表是需要全路径访问,前面加上global_temp. 例如:select * from global_temp.people
- 读取JSON文件创建DataFrame
scala> val df = spark.read.json("file:///opt/stufile/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
- 对DataFrame创建一个临时表
scala> df.createOrReplaceTempView("people")
- 通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 结果展示
scala> sqlDF.show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
- 创建全局表
scala> df.createOrReplaceGlobalTempView("people")
- 查询全表
scala> spark.sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
2.1.3、DSL语法
DataFrame提供一个特定领域语言(domain-specific language,DSL)去管理结构化的数据。可以在Scala、Java、Python等语言中使用DSL,使用DSL语法风格不必去创建临时表。
- 创建一个DataFrame
scala> val df = spark.read.json("file:///opt/stufile/user.json") df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
- 查看DataFrame的Schema信息
scala> df.printSchema root |-- age: long (nullable = true) |-- username: string (nullable = true)
- 只查看username列数据
scala> df.select("username").show() +--------+ |username| +--------+ |zhangsan| | lisi| | wangwu| +--------+
- 查看username列数据以及age+1数据
scala> df.select($"username",$"age" + 1).show scala> df.select('username, 'age + 1).show() scala> df.select('username, 'age + 1 as "newage").show() +--------+---------+ |username|(age + 1)| +--------+---------+ |zhangsan| 21| | lisi| 31| | wangwu| 41| +--------+---------+
- 查看age大于30的数据
scala> df.filter($"age">30).show +---+---------+ |age| username| +---+---------+ | 40| wangwu| +---+---------+
- 按照age分组,查看数据条数
scala> df.groupBy("age").count.show +---+-----+ |age|count| +---+-----+ | 20| 1| | 30| 1| | 40| 1| +---+-----+
2.2、Dataset
DataSet是具有强类型的数据集合,需要提供对于的类型信息。
2.2.1、创建DataSet
- 使用样例类序列创建Dataset
scala> case class Person(name: String, age: Long) defined class Person scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS() caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long] scala> caseClassDS.show +---------+---+ | name|age| +---------+---+ | zhangsan| 2| +---------+---+
- 使用基本类型的序列创建Dataset
scala> val ds = Seq(1,2,3,4,5).toDS ds: org.apache.spark.sql.Dataset[Int] = [value: int] scala> ds.show +-----+ |value| +-----+ | 1| | 2| | 3| | 4| | 5| +-----+
2.2.2、RDD与Dataset互转
-
RDD转换Dataset
SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。
scala> case class User(name:String, age:Int) defined class User scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS res0: org.apache.spark.sql.Dataset[User] = [name: string, age: int] scala> res0.show +--------+---+ | name|age| +--------+---+ |zhangsan| 30| | lisi| 49| +--------+---+
-
Dataset转换为RDD
scala> case class User(name:String, age:Int) defined class User scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1,t._2)).toDS res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int] scala> val rdd = res11.rdd rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25 scala> rdd.collect res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))
2.2.3、DataFrame 和 DataSet 转换
- DataFrame 转换为 DataSet
scala> case class User(name:String, age:Int) defined class User scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age") df: org.apache.spark.sql.DataFrame = [name: string, age: int] scala> val ds = df.as[User] ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
- DataSet 转换为 DataFrame
scala> val ds = df.as[User] ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int] scala> val df = ds.toDF df: org.apache.spark.sql.DataFrame = [name: string, age: int]
Spark-SQL之DataFrame操作大全
Spark SQL中的DataFrame类似于一张关系型数据表。在关系型数据库中对单表或进行的查询操作,在DataFrame中都可以通过调用其API接口来实现。可以参考,Scala提供的DataFrame API。
本文中的代码基于Spark-1.6.2的文档实现。
一、DataFrame对象的生成
Spark-SQL可以以其他RDD对象、parquet文件、json文件、Hive表,以及通过JDBC连接到其他关系型数据库作为数据源来生成DataFrame对象。本文将以MySQL数据库为数据源,生成DataFrame对象后进行相关的DataFame之上的操作。
文中生成DataFrame的代码如下:
object DataFrameOperations {
def main (args: Array[String ]) {
val sparkConf = new SparkConf().setAppName( "Spark SQL DataFrame Operations").setMaster( "local[2]" )
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkContext)
val url = "jdbc:mysql://m000:3306/test"
val jdbcDF = sqlContext.read.format( "jdbc" ).options(
Map( "url" -> url,
"user" -> "root",
"password" -> "root",
"dbtable" -> "spark_sql_test" )).load()
val joinDF1 = sqlContext.read.format( "jdbc" ).options(
Map("url" -> url ,
"user" -> "root",
"password" -> "root",
"dbtable" -> "spark_sql_join1" )).load()
val joinDF2 = sqlContext.read.format( "jdbc" ).options(
Map ( "url" -> url ,
"user" -> "root",
"password" -> "root",
"dbtable" -> "spark_sql_join2" )).load()
... ...
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
后续代码都在上面... ...
处。
二、DataFrame对象上Action操作
1、show
:展示数据
以表格的形式在输出中展示jdbcDF
中的数据,类似于select * from spark_sql_test
的功能。
show
方法有四种调用方式,分别为,
(1)show
只显示前20条记录。
示例:
jdbcDF.show
- 1
- 1
结果:
(2)show(numRows: Int)
显示numRows
条
示例:
jdbcDF.show(3)
- 1
- 1
结果:
(3)show(truncate: Boolean)
是否最多只显示20个字符,默认为true
。
示例:
jdbcDF.show(true)
jdbcDF.show(false)
- 1
- 2
- 1
- 2
结果:
(4)show(numRows: Int, truncate: Boolean)
综合前面的显示记录条数,以及对过长字符串的显示格式。
示例:
jdbcDF.show(3, false)
- 1
- 1
结果:
2、collect
:获取所有数据到数组
不同于前面的show
方法,这里的collect
方法会将jdbcDF
中的所有数据都获取到,并返回一个Array
对象。
jdbcDF.collect()
- 1
- 1
结果如下,结果数组包含了jdbcDF
的每一条记录,每一条记录由一个GenericRowWithSchema
对象来表示,可以存储字段名及字段值。
3、collectAsList
:获取所有数据到List
功能和collect
类似,只不过将返回结构变成了List
对象,使用方法如下
jdbcDF.collectAsList()
- 1
- 1
结果如下,
4、describe(cols: String*)
:获取指定字段的统计信息
这个方法可以动态的传入一个或多个String
类型的字段名,结果仍然为DataFrame
对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max
等。
使用方法如下,其中c1
字段为字符类型,c2
字段为整型,c4
字段为浮点型
jdbcDF .describe("c1" , "c2", "c4" ).show()
- 1
- 1
结果如下,
5、first, head, take, takeAsList
:获取若干行记录
这里列出的四个方法比较类似,其中
(1)first
获取第一行记录
(2)head
获取第一行记录,head(n: Int)
获取前n行记录
(3)take(n: Int)
获取前n行数据
(4)takeAsList(n: Int)
获取前n行数据,并以List
的形式展现
以Row
或者Array[Row]
的形式返回一行或多行数据。first
和head
功能相同。
take
和takeAsList
方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError
使用和结果略。
二、DataFrame对象上的条件查询和join等操作
以下返回为DataFrame类型的方法,可以连续调用。
1、where条件相关
(1)where(conditionExpr: String)
:SQL语言中where关键字后的条件
传入筛选条件表达式,可以用and
和or
。得到DataFrame类型的返回结果,
示例:
jdbcDF .where("id = 1 or c1 = ‘b‘" ).show()
- 1
- 1
结果,
(2)filter
:根据字段进行筛选
传入筛选条件表达式,得到DataFrame类型的返回结果。和where
使用条件相同
示例:
jdbcDF .filter("id = 1 or c1 = ‘b‘" ).show()
- 1
- 1
结果,
2、查询指定字段
(1)select
:获取指定字段值
根据传入的String
类型字段名,获取指定字段的值,以DataFrame类型返回
示例:
jdbcDF.select( "id" , "c3" ).show( false)
- 1
- 1
结果:
还有一个重载的select
方法,不是传入String
类型参数,而是传入Column
类型参数。可以实现select id, id+1 from test
这种逻辑。
jdbcDF.select(jdbcDF( "id" ), jdbcDF( "id") + 1 ).show( false)
- 1
- 1
结果:
能得到Column
类型的方法是apply
以及col
方法,一般用apply
方法更简便。
(2)selectExpr
:可以对指定字段进行特殊处理
可以直接对指定字段调用UDF函数,或者指定别名等。传入String
类型参数,得到DataFrame对象。
示例,查询id
字段,c3
字段取别名time
,c4
字段四舍五入:
jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show(false)
- 1
- 1
结果,
(3)col
:获取指定字段
只能获取一个字段,返回对象为Column类型。
val idCol = jdbcDF.col(“id”)果略。
(4)apply
:获取指定字段
只能获取一个字段,返回对象为Column类型
示例:
val idCol1 = jdbcDF.apply("id")
val idCol2 = jdbcDF("id")
- 1
- 2
- 1
- 2
结果略。
(5)drop
:去除指定字段,保留其他字段
返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。
示例:
jdbcDF.drop("id")
jdbcDF.drop(jdbcDF("id"))
- 1
- 2
- 1
- 2
结果:
3、limit
limit
方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take
与head
不同的是,limit
方法不是Action操作。
jdbcDF.limit(3).show( false)
- 1
- 1
结果,
4、order by
(1)orderBy
和sort
:按指定字段排序,默认为升序
示例1,按指定字段排序。加个-
表示降序排序。sort
和orderBy
使用方法相同
jdbcDF.orderBy(- jdbcDF("c4")).show(false)
// 或者
jdbcDF.orderBy(jdbcDF("c4").desc).show(false)
- 1
- 2
- 3
- 1
- 2
- 3
结果,
示例2,按字段字符串升序排序
jdbcDF.orderBy("c4").show(false)
- 1
- 1
结果,
(2)sortWithinPartitions
和上面的sort
方法功能类似,区别在于sortWithinPartitions
方法返回的是按Partition排好序的DataFrame对象。
5、group by
(1)groupBy
:根据字段进行group by
操作
groupBy
方法有两种调用方式,可以传入String
类型的字段名,也可传入Column
类型的对象。
使用方法如下,
jdbcDF .groupBy("c1" )
jdbcDF.groupBy( jdbcDF( "c1"))
- 1
- 2
- 1
- 2
(2)cube
和rollup
:group by的扩展
功能类似于SQL
中的group by cube/rollup
,略。
(3)GroupedData对象
该方法得到的是GroupedData
类型对象,在GroupedData
的API中提供了group by
之后的操作,比如,
max(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段min(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段mean(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段sum(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段-
count()
方法,获取分组中的元素个数运行结果示例:
count
max
这里面比较复杂的是以下两个方法,
agg
,该方法和下面介绍的类似,可以用于对指定字段进行聚合操作。
pivot
6、distinct
(1)distinct
:返回一个不包含重复记录的DataFrame
返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()
方法不传入指定字段时的结果相同。
示例:
jdbcDF.distinct()
- 1
- 1
结果,
(2)dropDuplicates
:根据指定字段去重
根据指定字段去重。类似于select distinct a, b
操作
示例:
jdbcDF.dropDuplicates(Seq("c1"))
- 1
- 1
结果:
7、聚合
聚合操作调用的是agg
方法,该方法有多种调用方式。一般与groupBy
方法配合使用。
以下示例其中最简单直观的一种用法,对id
字段求最大值,对c4
字段求和。
jdbcDF.agg("id" -> "max", "c4" -> "sum")
- 1
- 1
结果:
8、union
unionAll
方法:对两个DataFrame进行组合
类似于SQL
中的UNION ALL
操作。
示例:
jdbcDF.unionALL(jdbcDF.limit(1))
- 1
- 1
结果:
9、join
重点来了。在SQL
语言中用得很多的就是join
操作,DataFrame中同样也提供了join
的功能。
接下来隆重介绍join
方法。在DataFrame中提供了六个重载的join
方法。
(1)、笛卡尔积
joinDF1.join(joinDF2)
- 1
- 1
(2)、using
一个字段形式
下面这种join类似于a join b using column1
的形式,需要两个DataFrame中有相同的一个列名,
joinDF1.join(joinDF2, "id")
- 1
- 1
joinDF1
和joinDF2
根据字段id
进行join
操作,结果如下,using
字段只显示一次。
(3)、using
多个字段形式
除了上面这种using
一个字段的情况外,还可以using
多个字段,如下
joinDF1.join(joinDF2, Seq("id", "name"))
- 1
- 1
(4)、指定join
类型
两个DataFrame的join
操作有inner, outer, left_outer, right_outer, leftsemi
类型。在上面的using
多个字段的join情况下,可以写第三个String
类型参数,指定join
的类型,如下所示
joinDF1.join(joinDF2, Seq("id", "name"), "inner")
- 1
- 1
(5)、使用Column
类型来join
如果不用using
模式,灵活指定join
字段的话,可以使用如下形式
joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))
- 1
- 1
结果如下,
(6)、在指定join
字段同时指定join
类型
如下所示
joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"), "inner")
- 1
- 1
10、获取指定字段统计信息
stat
方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions
类型对象。
下面代码演示根据c4
字段,统计该字段值出现频率在30%
以上的内容。在jdbcDF
中字段c1
的内容为"a, b, a, c, d, b"
。其中a
和b
出现的频率为2 / 6
,大于0.3
jdbcDF.stat.freqItems(Seq ("c1") , 0.3).show()
- 1
- 1
结果如下:
11、获取两个DataFrame中共有的记录
intersect
方法可以计算出两个DataFrame中相同的记录,
jdbcDF.intersect(jdbcDF.limit(1)).show(false)
- 1
- 1
结果如下:
12、获取一个DataFrame中有另一个DataFrame中没有的记录
示例:
jdbcDF.except(jdbcDF.limit(1)).show(false)
- 1
- 1
结果如下,
13、操作字段名
(1)withColumnRenamed
:重命名DataFrame中的指定字段名
如果指定的字段名不存在,不进行任何操作。下面示例中将jdbcDF
中的id
字段重命名为idx
。
jdbcDF.withColumnRenamed( "id" , "idx" )
- 1
- 1
结果如下:
(2)withColumn
:往当前DataFrame中新增一列
whtiColumn(colName: String , col: Column)
方法根据指定colName
往DataFrame中新增一列,如果colName
已存在,则会覆盖当前列。
以下代码往jdbcDF
中新增一个名为id2
的列,
jdbcDF.withColumn("id2", jdbcDF("id")).show( false)
- 1
- 1
结果如下,
14、行转列
有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用explode
方法
下面代码中,根据c3
字段中的空格将字段内容进行分割,分割的内容存储在新的字段c3_
中,如下所示
jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}
- 1
- 1
结果如下,
以上是关于Spark-SQL——DataFrame与Dataset的主要内容,如果未能解决你的问题,请参考以下文章