Spark-SQL——DataFrame与Dataset

Posted Xsqone

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-SQL——DataFrame与Dataset相关的知识,希望对你有一定的参考价值。

文章目录

一、Spark SQL概述

1.1、Spark SQL是什么?

Spark SQL是Apache Spark中用于结构化数据处理的模块。它允许开发人员在Spark上执行SQL查询、处理结构化数据以及将它们与常规的RDD一起使用。Spark Sql提供了用于处理结构化数据的高级API,如DataFrames和Datasets,它们比原始的RDD API更加高效和方便。
通过Spark SQL,可以使用标准的SQL语言进行数据处理,也可以使用DataFrame API进行操作。此外,Spark SQL还可以集成Hive,允许用户直接查询Hive表并将查询结果存储到Hive中。Spark SQL也支持Spark中运行标准的JDBC/ODBC连接,从而可以使用各种外部数据源。

1.2、Hive和Spark SQL

Hive和Spark SQL都是用来处理大数据的工具,主要是基于Hadoop生态圈。它们的相同点都是用来查询和处理大规模数据的,而且都可以使用类SQL语言来进行操作。

不同之处:

  • 操作语言不同:Hive使用HQL(Hive Query Language)进行数据操作,而Spark SQL使用Spark SQL语法进行数据操作。
  • 数据处理方式不同:Hive依赖于MapReduce作为计算引擎,而Spark SQL使用Spark作为计算引擎。由于Spark的内存计算能力较强,所以在某些场景下,Spark SQL比Hive更加高效。
  • 数据存储方式不同:Hive使用HDFS或者其他支持Hadoop HDFS API的存储系统来存储数据,而Spark SQL可以支持多种不同的数据存储系统,例如:HDFS、Hive等。
  • 性能不同:Spark SQL的性能要比Hive快得多,主要是因为Spark Sql使用了内存计算技术,而Hive使用的是MapReduce计算模型。
  • 执行计划不同:Hive的执行计划是通过HQL生成的,而Spark Sql的执行计划是通过Spark的优化器生成的。Spark的优化器可以对查询进行优化,以提高查询的性能。

1.3、DataFrame与DataSet

  • DataFrame
    • DataFrame是Spark SQL的一种数据抽象,它表示分布式数据集合。DataFrame和关系型数据库中的表类似,都有列和行的概念,而且还具备了分布式的特性。DataFrame提供了丰富的数据操作接口,例如:选择、过滤、分组、聚合、排序、连接等。这些操作和SQL语句类似,便于使用和理解。
    • DataFrame可以看作是由一系列Row对象组成的RDD,每个Row代表着一行数据。不同于RDD,DataFrame具有数据类型的概念,这意味着DataFrame的每一列都有其对应的数据类型,这一特性使得Spark能够进行更为智能的优化,例如通过列式存储来提升查询效率。同时DataFrame的API使用了强类型语言的特性,使得在编译期就能够捕获到许多错误,提高了开发效率。
  • Dataset
    • Dataset是Spark中的一种强类型API,它结合了Spark SQL中的DataFrame和RDD的优点。
    • Dataset API提供了编译时类型安全和面向对象的编程接口,并利用Spark SQL的优化执行引擎来提高查询性能。它可以使用Scala、Java和Python等语言进行操作,支持多种数据源,如Parquet、JSON、JDBC等。
    • Dataset API还提供了对关系型数据和面向对象数据的支持,可将其转换为Dataset,通过统一的编程接口进行处理和操作。
  • 二者不同:
    • 数据类型:DataFrame 是一种以列为基础的分布式数据集合,强调数据的结构,即 schema,可以使用 SQL 或者 DataFrame 的 API 进行操作;Dataset 是一种类型化的分布式数据集合,强调类型的安全性,可以使用强类型的 API 进行操作。
    • 类型检查:DataFrame 只有在运行时才会进行类型检查,可能在运行时出现类型错误;Dataset 利用 Spark 强大的编译器来捕捉类型错误,可以在编译时就发现类型错误。
    • API:DataFrame 的 API 是基于 DataFrame 和 SQL 的 API,使用较为灵活,但是对于类型的检查较为弱;Dataset 的 API 是基于 Scala 和 Java 的编程语言,可以保证类型的安全性,但是使用的时候会相对复杂一些。
    • 编译时优化:Dataset 可以受益于 Spark 的 Catalyst 查询优化器,在编译时进行优化,可以提高性能;DataFrame 只有在运行时才能够获得优化。
    • 语言支持:DataFrame 可以使用 Scala,Java,Python 和 R 等编程语言,比 Dataset 支持的编程语言更多。Dataset 只支持 Scala 和 Java。
    • 性能:由于 Dataset 强制类型检查,所以可以获得更好的性能,因此在需要高性能的场景下推荐使用 Dataset。但是对于某些特定场景下,DataFrame 可能会更适合。

二、Spark SQL编程

Spark Core 中,如果想要执行应用程序,需要首先构建上下文环境对象 SparkContext,Spark SQL 其实可以理解为对 Spark Core 的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。
SparkSession 是 Spark 最新的 SQL 查询起始点,实质上是 SQLContext 和 HiveContext的组合,所以在 SQLContext 和 HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。当我们使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark 的 SparkSession 对象, 就像我们以前可以自动获取到一个 sc 来表示 SparkContext 对象一样

2.1、DataFrame

2.1.1、创建DataFrame

在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从HiveTable进行查询返回。

  • 从Spark数据源进行创建
    • Spark支持的创建文件的数据源格式:
    • 创建文件user.json文件
      “username”:“zhangsan”,“age”:20
    • 读取json文件创建DataFrame
      scala> spark.read.json("file:///opt/stufile/user.json")
      res2: org.apache.spark.sql.DataFrame = [age: bigint, username: string]          
      
    • 结果
      scala> res2.show()
      +---+--------+
      |age|username|
      +---+--------+
      | 20|zhangsan|
      +---+--------+
      
  • RDD与DataFrame互转
    • 在IDEA中开发程序时,如果需要RDD与DF或者DS之间进行互相操作,那么需要引入 import spark.implicits._
    • 在spark-shell中无需导入,自动完成此操作
    • 创建样例类
      scala> case class User(name:String,age:Int)
      defined class User
      
    • 创建RDD
      sc.makeRDD(List(("zhangsan",30),("lisi",20)))
      res4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at makeRDD at <console>:25
      
      
    • 转换DataFrame
      scala> res5.map(t => User(t._1,t._2)).toDF
      res7: org.apache.spark.sql.DataFrame = [name: string, age: int]
      
      scala> res7.show
      +--------+---+
      |    name|age|
      +--------+---+
      |zhangsan| 30|
      |    lisi| 20|
      +--------+---+
      
      
    • 转换为RDD
      scala> res7.rdd
      res9: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[21] at rdd at <console>:26
      
      

2.1.2、SQL语法

SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。

df.createOrReplaceTempView(“people”) 创建临时表(创建或重写临时表)
df.createOrReplaceGlobalTempView(“people”) 创建全局表(创建或重写全局表)
**注意:**普通临时表是Session范围内的,如果想要应用范围内有效,可以使用全局临时表。使用全局临时表是需要全路径访问,前面加上global_temp. 例如:select * from global_temp.people

  • 读取JSON文件创建DataFrame
scala> val df = spark.read.json("file:///opt/stufile/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string] 
  • 对DataFrame创建一个临时表
scala> df.createOrReplaceTempView("people")
  • 通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  • 结果展示
scala> sqlDF.show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
  • 创建全局表
scala> df.createOrReplaceGlobalTempView("people")
  • 查询全表
scala> spark.sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+

2.1.3、DSL语法

DataFrame提供一个特定领域语言(domain-specific language,DSL)去管理结构化的数据。可以在Scala、Java、Python等语言中使用DSL,使用DSL语法风格不必去创建临时表。

  • 创建一个DataFrame
    scala> val df = spark.read.json("file:///opt/stufile/user.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, username: string] 
       
    
  • 查看DataFrame的Schema信息
    	scala> df.printSchema
    	root
    	 |-- age: long (nullable = true)
    	 |-- username: string (nullable = true)
    
  • 只查看username列数据
    scala> df.select("username").show()
    +--------+
    |username|
    +--------+
    |zhangsan|
    | lisi|
    | wangwu|
    +--------+
    
  • 查看username列数据以及age+1数据
    scala> df.select($"username",$"age" + 1).show
    scala> df.select('username, 'age + 1).show()
    scala> df.select('username, 'age + 1 as "newage").show()
    +--------+---------+
    |username|(age + 1)|
    +--------+---------+
    |zhangsan| 21|
    | lisi| 31|
    | wangwu| 41|
    +--------+---------+
    
  • 查看age大于30的数据
    scala> df.filter($"age">30).show
    +---+---------+
    |age| username|
    +---+---------+
    | 40| wangwu|
    +---+---------+
    
  • 按照age分组,查看数据条数
    scala> df.groupBy("age").count.show
    +---+-----+
    |age|count|
    +---+-----+
    | 20| 1|
    | 30| 1|
    | 40| 1|
    +---+-----+
    

2.2、Dataset

DataSet是具有强类型的数据集合,需要提供对于的类型信息。

2.2.1、创建DataSet

  • 使用样例类序列创建Dataset
    scala> case class Person(name: String, age: Long)
    defined class Person
    scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
    caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
    scala> caseClassDS.show
    +---------+---+
    | name|age|
    +---------+---+
    | zhangsan| 2|
    +---------+---+
    
  • 使用基本类型的序列创建Dataset
    scala> val ds = Seq(1,2,3,4,5).toDS
    ds: org.apache.spark.sql.Dataset[Int] = [value: int]
    scala> ds.show
    +-----+
    |value|
    +-----+
    | 1|
    | 2|
    | 3|
    | 4|
    | 5|
    +-----+
    

2.2.2、RDD与Dataset互转

  • RDD转换Dataset

    SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。

    scala> case class User(name:String, age:Int)
    defined class User
    
    scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
    res0: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    
    scala> res0.show
    +--------+---+                                                                  
    |    name|age|
    +--------+---+
    |zhangsan| 30|
    |    lisi| 49|
    +--------+---+
    
    
  • Dataset转换为RDD

    scala> case class User(name:String, age:Int)
    defined class User
    scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1,t._2)).toDS
    res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    scala> val rdd = res11.rdd
    rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25
    scala> rdd.collect
    res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))
    

2.2.3、DataFrame 和 DataSet 转换

  • DataFrame 转换为 DataSet
    scala> case class User(name:String, age:Int)
    defined class User
    scala> val df = sc.makeRDD(List(("zhangsan",30),
    ("lisi",49))).toDF("name","age")
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    scala> val ds = df.as[User]
    ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    
  • DataSet 转换为 DataFrame
    scala> val ds = df.as[User]
    ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    scala> val df = ds.toDF
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    

Spark-SQL之DataFrame操作大全

 Spark SQL中的DataFrame类似于一张关系型数据表。在关系型数据库中对单表或进行的查询操作,在DataFrame中都可以通过调用其API接口来实现。可以参考,Scala提供的DataFrame API

  本文中的代码基于Spark-1.6.2的文档实现。

一、DataFrame对象的生成

  Spark-SQL可以以其他RDD对象、parquet文件、json文件、Hive表,以及通过JDBC连接到其他关系型数据库作为数据源来生成DataFrame对象。本文将以MySQL数据库为数据源,生成DataFrame对象后进行相关的DataFame之上的操作。 
  文中生成DataFrame的代码如下:

object DataFrameOperations {
  def main (args: Array[String ]) {
    val sparkConf = new SparkConf().setAppName( "Spark SQL DataFrame Operations").setMaster( "local[2]" )
    val sparkContext = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sparkContext)
    val url = "jdbc:mysql://m000:3306/test"

    val jdbcDF = sqlContext.read.format( "jdbc" ).options(
      Map( "url" -> url,
        "user" -> "root",
        "password" -> "root",
        "dbtable" -> "spark_sql_test" )).load()

    val joinDF1 = sqlContext.read.format( "jdbc" ).options(
      Map("url" -> url ,
        "user" -> "root",
        "password" -> "root",
        "dbtable" -> "spark_sql_join1" )).load()

    val joinDF2 = sqlContext.read.format( "jdbc" ).options(
      Map ( "url" -> url ,
        "user" -> "root",
        "password" -> "root",
        "dbtable" -> "spark_sql_join2" )).load()

    ... ...
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

  后续代码都在上面... ...处。

二、DataFrame对象上Action操作

1、show:展示数据

  以表格的形式在输出中展示jdbcDF中的数据,类似于select * from spark_sql_test的功能。 
  show方法有四种调用方式,分别为, 
(1)show 
  只显示前20条记录。 
  示例:

jdbcDF.show
  • 1
  • 1

  结果: 
  技术分享

(2)show(numRows: Int) 
  显示numRows条 
  示例:

jdbcDF.show(3)
  • 1
  • 1

  结果: 
  技术分享

(3)show(truncate: Boolean) 
  是否最多只显示20个字符,默认为true。 
  示例:

jdbcDF.show(true)
jdbcDF.show(false)
  • 1
  • 2
  • 1
  • 2

  结果: 
  技术分享

(4)show(numRows: Int, truncate: Boolean) 
  综合前面的显示记录条数,以及对过长字符串的显示格式。 
  示例:

jdbcDF.show(3, false)
  • 1
  • 1

  结果: 
  技术分享

2、collect:获取所有数据到数组

  不同于前面的show方法,这里的collect方法会将jdbcDF中的所有数据都获取到,并返回一个Array对象。

jdbcDF.collect()
  • 1
  • 1

  结果如下,结果数组包含了jdbcDF的每一条记录,每一条记录由一个GenericRowWithSchema对象来表示,可以存储字段名及字段值。 
  技术分享

3、collectAsList:获取所有数据到List

  功能和collect类似,只不过将返回结构变成了List对象,使用方法如下

jdbcDF.collectAsList()
  • 1
  • 1

  结果如下, 
  技术分享

4、describe(cols: String*):获取指定字段的统计信息

  这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。 
  使用方法如下,其中c1字段为字符类型,c2字段为整型,c4字段为浮点型

jdbcDF .describe("c1" , "c2", "c4" ).show()
  • 1
  • 1

  结果如下, 
  技术分享

5、first, head, take, takeAsList:获取若干行记录

  这里列出的四个方法比较类似,其中 
  (1)first获取第一行记录 
  (2)head获取第一行记录,head(n: Int)获取前n行记录 
  (3)take(n: Int)获取前n行数据 
  (4)takeAsList(n: Int)获取前n行数据,并以List的形式展现 
  以Row或者Array[Row]的形式返回一行或多行数据。firsthead功能相同。 
  taketakeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError

  使用和结果略。

二、DataFrame对象上的条件查询和join等操作

  以下返回为DataFrame类型的方法,可以连续调用。

1、where条件相关

(1)where(conditionExpr: String):SQL语言中where关键字后的条件 
  传入筛选条件表达式,可以用andor。得到DataFrame类型的返回结果, 
  示例:

jdbcDF .where("id = 1 or c1 = ‘b‘" ).show()
  • 1
  • 1

  结果, 
  技术分享

(2)filter:根据字段进行筛选 
  传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同 
  示例:

jdbcDF .filter("id = 1 or c1 = ‘b‘" ).show()
  • 1
  • 1

  结果, 
  技术分享

2、查询指定字段

(1)select:获取指定字段值 
  根据传入的String类型字段名,获取指定字段的值,以DataFrame类型返回 
  示例:

jdbcDF.select( "id" , "c3" ).show( false)
  • 1
  • 1

  结果: 
  技术分享

  还有一个重载的select方法,不是传入String类型参数,而是传入Column类型参数。可以实现select id, id+1 from test这种逻辑。

jdbcDF.select(jdbcDF( "id" ), jdbcDF( "id") + 1 ).show( false)
  • 1
  • 1

  结果: 
  技术分享

  能得到Column类型的方法是apply以及col方法,一般用apply方法更简便。

(2)selectExpr:可以对指定字段进行特殊处理 
  可以直接对指定字段调用UDF函数,或者指定别名等。传入String类型参数,得到DataFrame对象。 
  示例,查询id字段,c3字段取别名timec4字段四舍五入:

jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show(false)
  • 1
  • 1

  结果, 
  技术分享

(3)col:获取指定字段 
  只能获取一个字段,返回对象为Column类型。 
  val idCol = jdbcDF.col(“id”)果略。

(4)apply:获取指定字段 
  只能获取一个字段,返回对象为Column类型 
  示例:

val idCol1 = jdbcDF.apply("id")
val idCol2 = jdbcDF("id")
  • 1
  • 2
  • 1
  • 2

  结果略。

(5)drop:去除指定字段,保留其他字段 
  返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。 
  示例:

jdbcDF.drop("id")
jdbcDF.drop(jdbcDF("id"))
  • 1
  • 2
  • 1
  • 2

  结果: 
  技术分享

3、limit

  limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和takehead不同的是,limit方法不是Action操作。

jdbcDF.limit(3).show( false)
  • 1
  • 1

  结果, 
  技术分享

4、order by

(1)orderBysort:按指定字段排序,默认为升序 
  示例1,按指定字段排序。加个-表示降序排序。sortorderBy使用方法相同

jdbcDF.orderBy(- jdbcDF("c4")).show(false)
// 或者
jdbcDF.orderBy(jdbcDF("c4").desc).show(false)
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

  结果, 
  技术分享

  示例2,按字段字符串升序排序

jdbcDF.orderBy("c4").show(false)
  • 1
  • 1

  结果, 
  技术分享

(2)sortWithinPartitions 
  和上面的sort方法功能类似,区别在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame对象。

5、group by

(1)groupBy:根据字段进行group by操作 
  groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象。 
  使用方法如下,

jdbcDF .groupBy("c1" )
jdbcDF.groupBy( jdbcDF( "c1"))
  • 1
  • 2
  • 1
  • 2

(2)cuberollup:group by的扩展 
  功能类似于SQL中的group by cube/rollup,略。

(3)GroupedData对象 
  该方法得到的是GroupedData类型对象,在GroupedData的API中提供了group by之后的操作,比如,

  • max(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
  • min(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
  • mean(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段
  • sum(colNames: String*)方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
  • count()方法,获取分组中的元素个数

      运行结果示例: 
      count 
      技术分享

      max 
      技术分享

      这里面比较复杂的是以下两个方法, 
    agg,该方法和下面介绍的类似,可以用于对指定字段进行聚合操作。

pivot

6、distinct

(1)distinct:返回一个不包含重复记录的DataFrame 
  返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。 
  示例:

jdbcDF.distinct()
  • 1
  • 1

  结果, 
  技术分享

(2)dropDuplicates:根据指定字段去重 
  根据指定字段去重。类似于select distinct a, b操作 
  示例:

jdbcDF.dropDuplicates(Seq("c1"))
  • 1
  • 1

  结果: 
  技术分享

7、聚合

  聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。 
  以下示例其中最简单直观的一种用法,对id字段求最大值,对c4字段求和。

jdbcDF.agg("id" -> "max", "c4" -> "sum")
  • 1
  • 1

  结果: 
  技术分享

8、union

  unionAll方法:对两个DataFrame进行组合 
  类似于SQL中的UNION ALL操作。 
  示例:

jdbcDF.unionALL(jdbcDF.limit(1))
  • 1
  • 1

  结果: 
  技术分享

9、join

  重点来了。在SQL语言中用得很多的就是join操作,DataFrame中同样也提供了join的功能。 
  接下来隆重介绍join方法。在DataFrame中提供了六个重载的join方法。 
(1)、笛卡尔积

joinDF1.join(joinDF2)
  • 1
  • 1

(2)、using一个字段形式 
  下面这种join类似于a join b using column1的形式,需要两个DataFrame中有相同的一个列名,

joinDF1.join(joinDF2, "id")
  • 1
  • 1

  joinDF1joinDF2根据字段id进行join操作,结果如下,using字段只显示一次。 
  技术分享

(3)、using多个字段形式 
  除了上面这种using一个字段的情况外,还可以using多个字段,如下

joinDF1.join(joinDF2, Seq("id", "name"))
  • 1
  • 1

(4)、指定join类型 
  两个DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi类型。在上面的using多个字段的join情况下,可以写第三个String类型参数,指定join的类型,如下所示

joinDF1.join(joinDF2, Seq("id", "name"), "inner")
  • 1
  • 1

(5)、使用Column类型来join 
  如果不用using模式,灵活指定join字段的话,可以使用如下形式

joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))
  • 1
  • 1

  结果如下, 
  技术分享

(6)、在指定join字段同时指定join类型 
  如下所示

joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"), "inner")
  • 1
  • 1

10、获取指定字段统计信息

  stat方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个DataFramesStatFunctions类型对象。 
  下面代码演示根据c4字段,统计该字段值出现频率在30%以上的内容。在jdbcDF中字段c1的内容为"a, b, a, c, d, b"。其中ab出现的频率为2 / 6,大于0.3

jdbcDF.stat.freqItems(Seq ("c1") , 0.3).show()
  • 1
  • 1

  结果如下: 
  技术分享

11、获取两个DataFrame中共有的记录

  intersect方法可以计算出两个DataFrame中相同的记录,

jdbcDF.intersect(jdbcDF.limit(1)).show(false)
  • 1
  • 1

  结果如下: 
  技术分享

12、获取一个DataFrame中有另一个DataFrame中没有的记录

  示例:

jdbcDF.except(jdbcDF.limit(1)).show(false)
  • 1
  • 1

  结果如下, 
  技术分享

13、操作字段名

(1)withColumnRenamed:重命名DataFrame中的指定字段名 
  如果指定的字段名不存在,不进行任何操作。下面示例中将jdbcDF中的id字段重命名为idx

jdbcDF.withColumnRenamed( "id" , "idx" )
  • 1
  • 1

  结果如下: 
  技术分享

(2)withColumn:往当前DataFrame中新增一列 
  whtiColumn(colName: String , col: Column)方法根据指定colName往DataFrame中新增一列,如果colName已存在,则会覆盖当前列。 
  以下代码往jdbcDF中新增一个名为id2的列,

jdbcDF.withColumn("id2", jdbcDF("id")).show( false)
  • 1
  • 1

  结果如下, 
  技术分享

14、行转列

  有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用explode方法 
  下面代码中,根据c3字段中的空格将字段内容进行分割,分割的内容存储在新的字段c3_中,如下所示

jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}
  • 1
  • 1

  结果如下, 
  技术分享


























































































以上是关于Spark-SQL——DataFrame与Dataset的主要内容,如果未能解决你的问题,请参考以下文章

Spark-SQL之DataFrame操作大全

Spark-SQL之DataFrame操作大全

Spark-Sql之DataFrame实战详解

Spark-SQL之DataFrame操作

spark-sql将Rdd转换为DataFrame进行操作的两种方法

使用 spark-sql 缓存临时表