spark知识体系04-SQL,DataFrames,DateSets
Posted molyeo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark知识体系04-SQL,DataFrames,DateSets相关的知识,希望对你有一定的参考价值。
简介
Spark SQL 是 Spark 处理结构化数据的一个模块.与基础的 Spark RDD API 不同, Spark SQL 提供了查询结构化数据及计算结果等信息的接口.在内部, Spark SQL 使用这个额外的信息去执行额外的优化.有几种方式可以跟 Spark SQL 进行交互, 包括 SQL 和 Dataset API.当使用相同执行引擎进行计算时, 无论使用哪种 API / 语言都可以快速的计算.这种统一意味着开发人员能够在基于提供最自然的方式来表达一个给定的 transformation API 之间实现轻松的来回切换不同的 .
SQL
Spark SQL 的功能之一是执行 SQL 查询.Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据.更多关于如何配置这个特性的信息, 请参考 Hive 表 这部分. 当以另外的编程语言运行SQL 时, 查询结果将以 Dataset/DataFrame的形式返回.
Datasets and DataFrames
一个 Dataset 是一个分布式的数据集合,它提供了 RDD 的优点(强类型化, 能够使用强大的 lambda 函数)与Spark SQL执行引擎的优点.一个 Dataset 可以从 JVM 对象来 构造 并且使用转换功能(map, flatMap, filter, 等等). Dataset API 在Scala 和 Java是可用的.Python 不支持 Dataset API.但是由于 Python 的动态特性, 许多 Dataset API 的优点已经可用了 (也就是说, 你可能通过 name 天生的row.columnName属性访问一行中的字段).这种情况和 R 相似.
一个 DataFrame 是一个 Dataset 组成的指定列. DataFrames 可以从大量的 sources 中构造出来, 比如: 结构化的文本文件, Hive中的表, 外部数据库, 或者已经存在的 RDDs. 在 the Scala API中, DataFrame 仅仅是一个 Dataset[Row]类型的别名(type DataFrame = Dataset[Row]
)。
DataSet创建
在一个 SparkSession中, 应用程序可以从一个 已经存在的 RDD, 从hive表, 或者从 Spark数据源中创建一个DataFrames.
举个例子, 下面就是基于一个JSON文件创建一个DataFrame:
scala> val spark = SparkSession.builder().appName("DataFrameTest").getOrCreate()
18/03/07 10:42:40 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = [email protected]
scala> val df = spark.read.json("D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
DataSet基本操作
DataFrame基本操作
基本操作主要是简单的列引用和表达式,示例如下,同时也有丰富的函数库,包括string操作, date算术。
scala> import spark.implicits._
import spark.implicits._
scala>
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.select($"name", $"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.filter($"age" > 20).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.groupBy($"age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
局部视图和全局视图
SparkSession 的 sql 函数可以让应用程序以编程的方式运行 SQL 查询, 并将结果作为一个 DataFrame 返回。
Spark SQL中的临时视图是session级别的, 也就是会随着session的消失而消失. 如果你想让一个临时视图在所有session中相互传递并且可用, 直到Spark 应用退出, 你可以建立一个全局的临时视图.全局的临时视图存在于系统数据库 global_temp中, 我们必须加上库名去引用。
scala> //临时表和全局表
scala> df.createOrReplaceTempView("people")
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+------
scala> df.createOrReplaceGlobalTempView("people")
scala> spark.sql("SELECT * FROM global_temp.people").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
DataFrame创建
DataSet不同于RDD,没有使用Java序列化器或者Kryo进行序列化,而是使用一个特定的编码器进行序列化,这些序列化器可以自动生成,而且在spark执行很多操作(过滤、排序、hash)的时候不用进行反序列化。
scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> caseClassDS.show()
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> primitiveDS.map(_ + 1).collect()
res28: Array[Int] = Array(2, 3, 4)
scala> val path = "D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.json"
path: String = D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.json
scala> val peopleDS = spark.read.json(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> peopleDS.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
RDD和DataSet相互转换
Spark SQL 支持两种不同的方法用于将存在的RDD转成Datasets。
第一种方法使用反射去推断包含特定类型对象的RDD模式(schema),该模式使你的代码更加简练,不过你必须在写Spark的程序的时候已经知道模式信息(比如RDD中的对象是自己定义的case class类型)。
第二种方法是通过一个编程接口,此时你需要构造一个模式,将其应用到一个已经存在的RDD上将其转化为DataFrame,该方法适用于运行之前还不知道列以及列的类型。
使用反射推断Schema
Spark SQL 的 Scala 接口支持自动转换一个包含 case classes 的 RDD 为 DataFrame.Case class 定义了表的 Schema.Case class 的参数名使用反射读取并且成为了列名.Case class 也可以是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型.这个 RDD 能够被隐式转换成一个 DataFrame 然后被注册为一个表.表可以用于后续的 SQL 语句。
scala> val peopleDF = spark.read.textFile("D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> peopleDF.createOrReplaceTempView("people")
scala> val teenagerDF = spark.sql("SELECT name,age FROM people where age BETWEEN 13 AND 19")
teenagerDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> teenagerDF.map(teenager => "Name:" + teenager(0)).show()
+-----------+
| value|
+-----------+
|Name:Justin|
+-----------+
scala> teenagerDF.map(teenager => "Name:" + teenager.getAs[String]("name")).show()
+-----------+
| value|
+-----------+
|Name:Justin|
+-----------+
scala> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
mapEncoder: org.apache.spark.sql.Encoder[Map[String,Any]] = class[value[0]: binary]
scala> teenagerDF.map(teenagerDF => teenagerDF.getValuesMap[Any](List("name", "age"))).collect()
res33: Array[Map[String,Any]] = Array(Map(name -> Justin, age -> 19))
以编程的方式指定Schema
当case class不能提前定义时(比如记录的结构被编码为字符串,或者当文本数据集被解析时不同用户需要映射不同的字段),可以通过下面三步来将RDD转换为DataFrame:
1、从原始RDD创建得到一个包含Row对象的RDD。
2、创建一个与第1步中Row的结构相匹配的StructType,以表示模式信息。
3、通过createDataFrame()将模式信息应用到第1步创建的RDD上。
scala> val schemaString = "name age"
schemaString: String = name age
scala> val fields = schemaString.split(" ").map(field => StructField(field, StringType, nullable = true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,StringType,true))
scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true))
scala>
scala> val peopleRDD = spark.sparkContext.textFile("D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.txt MapPartitionsRDD[115] at textFile at <console>:47
scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[117] at map at <console>:49
scala>
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: string]
scala> peopleDF.createOrReplaceTempView("people")
scala> val results = spark.sql("SELECT name FROM people")
results: org.apache.spark.sql.DataFrame = [name: string]
scala> results.map(attributes => "Name: " + attributes(0)).show()
+-------------+
| value|
+-------------+
|Name: Michael|
| Name: Andy|
| Name: Justin|
+-------------+
聚合函数
SparkSQL提供了常用的聚合函数如count(), countDistinct(), avg(), max(), min(),当上述函数不满足使用要求时,用户可以自定义聚合函数。
自定义聚合函数(无类型)
用户通过实现UserDefinedAggregateFunction的相关接口来创建聚合函数,UserDefinedAggregateFunction相关接口的含义见代码所示,如下定义求平均值的函数MyAverage。
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoders, Encoder, Row, SparkSession}
object MyAverage extends UserDefinedAggregateFunction {
// 输入参数数据类型
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
//聚合缓存数据类型
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 输出参数数据类型
def dataType: DataType = DoubleType
//相同的输入是否返回相同的输出
def deterministic: Boolean = true
//初始化聚合缓存
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//聚合缓存更新(buffer和row)
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
//聚合缓存合并(buffer和buffer)
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
//计算最终结果
def evaluate(buffer: Row): Double = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}
定义完成并注册后,则可以像内建函数一样使用。
scala> spark.udf.register("myAverage", MyAverage)
res0: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = [email protected]
scala> val df = spark.read.json("D:/workspace_spark/spark-2.2.0/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
scala> df.createOrReplaceTempView("employees")
scala> df.show()
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
scala> val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result: org.apache.spark.sql.DataFrame = [average_salary: double]
scala> result.show()
+--------------+
|average_salary|
+--------------+
| 3750.0|
+--------------+
自定义类型安全的聚合函数
自定义类型安全的聚合函数是通过实现org.apache.spark.sql.expressions.Aggregator
的接口来完成的,一个求平均值的聚合函数示例如下:
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoders, Encoder, Row, SparkSession}
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage2 extends Aggregator[Employee, Average, Double] {
/**
* A zero value for this aggregation. Should satisfy the property that any b + zero = b.
* @since 1.6.0
*/
override def zero: Average = Average(0L, 0L)
/**
* Combine two values to produce a new value. For performance, the function may modify `b` and
* return it instead of constructing new object for b.
* @since 1.6.0
*/
override def reduce(b: Average, a: Employee): Average = {
b.sum = b.sum + a.salary
b.count = b.count + 1
b
}
/**
* Transform the output of the reduction.
* @since 1.6.0
*/
override def finish(reduction: Average): Double = {
reduction.sum.toDouble / reduction.count
}
/**
* Merge two intermediate values.
* @since 1.6.0
*/
override def merge(b1: Average, b2: Average): Average = {
b1.sum = b1.sum + b2.sum
b1.count = b1.count + b2.count
b1
}
/**
* Specifies the `Encoder` for the intermediate value type.
* @since 2.0.0
*/
override def bufferEncoder: Encoder[Average] = Encoders.product
/**
* Specifies the `Encoder` for the final ouput value type.
* @since 2.0.0
*/
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
数据源
DataFrame 可以当做标准的RDD进行操作,也可以注册为一个临时表。将DataFrame注册为一个临时表,准许你在上执行SQL查询。
DataFrame接口可以处理多种数据源,SparkSql 也内建若干个有用的数据源格式(Json、parquet、jdbc)。此外,当你用SQL查询的数据源的时候,只使用了一部分字段,SparkSQL可以智能扫描这些字段。
标准的加载和保存函数
在最简单的形式中, 默认数据源(parquet, 除非另有配置 spark.sql.sources.default )将用于所有操作.
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手工指定选项
你可以通过手工指定数据源和任何想要传递给数据源的选项。指定数据源通常需要使用数据源全名(如org.apache.spark.sql.parquet),但对于内建数据源,你也可以使用它们的短名(json、parquet和jdbc)。并且不同的数据源类型之间都可以相互转换。
示例如下:
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
直接在文件上运行 SQL
不使用读取 API 将文件加载到 DataFrame 并进行查询, 也可以直接用 SQL 查询该文件.
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Save Modes (保存模式)
Save operations (保存操作)可以选择使用 SaveMode , 它指定如何处理现有数据如果存在的话. 重要的是要意识到, 这些 save modes (保存模式)不使用任何 locking (锁定)并且不是 atomic (原子). 另外, 当执行 Overwrite 时, 数据将在新数据写出之前被删除.
Saving to Persistent Tables (保存到持久表)
DataFrames 也可以使用 saveAsTable 命令作为 persistent tables (持久表)保存到 Hive metastore 中. 请注意, existing Hive deployment (现有的 Hive 部署)不需要使用此功能. Spark 将为您创建默认的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 与 createOrReplaceTempView 命令不同, saveAsTable 将 materialize (实现) DataFrame 的内容, 并创建一个指向 Hive metastore 中数据的指针. 即使您的 Spark 程序重新启动, Persistent tables (持久性表)仍然存在, 因为您保持与同一个 metastore 的连接. 可以通过使用表的名称在 SparkSession 上调用 table 方法来创建 persistent tabl (持久表)的 DataFrame .
对于 file-based (基于文件)的 data source (数据源), 例如 text, parquet, json等, 您可以通过 path 选项指定 custom table path (自定义表路径), 例如 df.write.option("path", "/some/path").saveAsTable("t") . 当表被 dropped (删除)时, custom table path (自定义表路径)将不会被删除, 并且表数据仍然存在. 如果未指定自定义表路径, Spark 将把数据写入 warehouse directory (仓库目录)下的默认表路径. 当表被删除时, 默认的表路径也将被删除.
Bucketing, Sorting and Partitioning (分桶, 排序和分区)
对于 file-based data source (基于文件的数据源), 也可以对 output (输出)进行 bucket 和 sort 或者 partition . Bucketing 和 sorting 仅适用于 persistent tables :
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
在使用 Dataset API 时, partitioning 可以同时与 save 和 saveAsTable 一起使用.
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
可以为 single table (单个表)使用 partitioning 和 bucketing:
peopleDF.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")
partitionBy 创建一个 directory structure (目录结构), 如 Partition Discovery 部分所述. 因此, 对 cardinality (基数)较高的 columns 的适用性有限. 相反, bucketBy 可以在固定数量的 buckets 中分配数据, 并且可以在 a number of unique values is unbounded (多个唯一值无界时)使用数据.
Parquet Files
Parquet 格式是被许多其他的数据处理系统支持的列数据格式类型。Spark Sql支持在读写Parquet文件的时候自动保存原始数据的模式信息。在写Parquet文件时候,所有的列将会因为兼容原因转成nullable。
Loading Data Programmatically (以编程的方式加载数据)
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
Partition Discovery (分区发现)
Table partitioning (表分区)是在像 Hive 这样的系统中使用的常见的优化方法. 在 partitioned table (分区表)中, 数据通常存储在不同的目录中, partitioning column values encoded (分区列值编码)在每个 partition directory (分区目录)的路径中. Parquet data source (Parquet 数据源)现在可以自动 discover (发现)和 infer (推断)分区信息. 例如, 我们可以使用以下 directory structure (目录结构)将所有以前使用的 population data (人口数据)存储到 partitioned table (分区表)中, 其中有两个额外的列 gender 和 country 作为 partitioning columns (分区列):
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
通过将 path/to/table 传递给 SparkSession.read.parquet 或 SparkSession.read.load , Spark SQL 将自动从路径中提取 partitioning information (分区信息). 现在返回的 DataFrame 的 schema (模式)变成:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
请注意, 会自动 inferred (推断) partitioning columns (分区列)的 data types (数据类型).目前, 支持 numeric data types (数字数据类型)和 string type (字符串类型).有些用户可能不想自动推断 partitioning columns (分区列)的数据类型.对于这些用例, automatic type inference (自动类型推断)可以由 spark.sql.sources.partitionColumnTypeInference.enabled 配置, 默认为 true .当禁用 type inference (类型推断)时, string type (字符串类型)将用于 partitioning columns (分区列).
从 Spark 1.6.0 开始, 默认情况下, partition discovery (分区发现)只能找到给定路径下的 partitions (分区).对于上述示例, 如果用户将 path/to/table/gender=male 传递给 SparkSession.read.parquet 或 SparkSession.read.load , 则 gender 将不被视为 partitioning column (分区列).如果用户需要指定 partition discovery (分区发现)应该开始的基本路径, 则可以在数据源选项中设置 basePath.例如, 当 path/to/table/gender=male 是数据的路径并且用户将 basePath 设置为 path/to/table/, gender 将是一个 partitioning column (分区列).
Schema Merging (模式合并)
像 ProtocolBuffer , Avro 和 Thrift 一样, Parquet 也支持 schema evolution (模式演进). 用户可以从一个 simple schema (简单的架构)开始, 并根据需要逐渐向 schema 添加更多的 columns (列). 以这种方式, 用户可能会使用不同但相互兼容的 schemas 的 multiple Parquet files (多个 Parquet 文件). Parquet data source (Parquet 数据源)现在能够自动检测这种情况并 merge (合并)所有这些文件的 schemas .
由于 schema merging (模式合并)是一个 expensive operation (相对昂贵的操作), 并且在大多数情况下不是必需的, 所以默认情况下从 1.5.0 开始. 你可以按照如下的方式启用它:
读取 Parquet 文件时, 将 data source option (数据源选项) mergeSchema 设置为 true (如下面的例子所示)
将 global SQL option (全局 SQL 选项) spark.sql.parquet.mergeSchema 设置为 true。
// This is used to implicitly convert an RDD to a DataFrame. import spark.implicits._ // Create a simple DataFrame, store into a partition directory val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true)
---恢复内容结束---
简介
Spark SQL 是 Spark 处理结构化数据的一个模块.与基础的 Spark RDD API 不同, Spark SQL 提供了查询结构化数据及计算结果等信息的接口.在内部, Spark SQL 使用这个额外的信息去执行额外的优化.有几种方式可以跟 Spark SQL 进行交互, 包括 SQL 和 Dataset API.当使用相同执行引擎进行计算时, 无论使用哪种 API / 语言都可以快速的计算.这种统一意味着开发人员能够在基于提供最自然的方式来表达一个给定的 transformation API 之间实现轻松的来回切换不同的 .
SQL
Spark SQL 的功能之一是执行 SQL 查询.Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据.更多关于如何配置这个特性的信息, 请参考 Hive 表 这部分. 当以另外的编程语言运行SQL 时, 查询结果将以 Dataset/DataFrame的形式返回.
Datasets and DataFrames
一个 Dataset 是一个分布式的数据集合,它提供了 RDD 的优点(强类型化, 能够使用强大的 lambda 函数)与Spark SQL执行引擎的优点.一个 Dataset 可以从 JVM 对象来 构造 并且使用转换功能(map, flatMap, filter, 等等). Dataset API 在Scala 和 Java是可用的.Python 不支持 Dataset API.但是由于 Python 的动态特性, 许多 Dataset API 的优点已经可用了 (也就是说, 你可能通过 name 天生的row.columnName属性访问一行中的字段).这种情况和 R 相似.
一个 DataFrame 是一个 Dataset 组成的指定列. DataFrames 可以从大量的 sources 中构造出来, 比如: 结构化的文本文件, Hive中的表, 外部数据库, 或者已经存在的 RDDs. 在 the Scala API中, DataFrame 仅仅是一个 Dataset[Row]类型的别名(type DataFrame = Dataset[Row]
)。
DataSet创建
在一个 SparkSession中, 应用程序可以从一个 已经存在的 RDD, 从hive表, 或者从 Spark数据源中创建一个DataFrames.
举个例子, 下面就是基于一个JSON文件创建一个DataFrame:
scala> val spark = SparkSession.builder().appName("DataFrameTest").getOrCreate()
18/03/07 10:42:40 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = [email protected]
scala> val df = spark.read.json("D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
DataSet基本操作
DataFrame基本操作
基本操作主要是简单的列引用和表达式,示例如下,同时也有丰富的函数库,包括string操作, date算术。
scala> import spark.implicits._
import spark.implicits._
scala>
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.select($"name", $"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala> df.filter($"age" > 20).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.groupBy($"age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
局部视图和全局视图
SparkSession 的 sql 函数可以让应用程序以编程的方式运行 SQL 查询, 并将结果作为一个 DataFrame 返回。
Spark SQL中的临时视图是session级别的, 也就是会随着session的消失而消失. 如果你想让一个临时视图在所有session中相互传递并且可用, 直到Spark 应用退出, 你可以建立一个全局的临时视图.全局的临时视图存在于系统数据库 global_temp中, 我们必须加上库名去引用。
scala> //临时表和全局表
scala> df.createOrReplaceTempView("people")
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+------
scala> df.createOrReplaceGlobalTempView("people")
scala> spark.sql("SELECT * FROM global_temp.people").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
DataFrame创建
DataSet不同于RDD,没有使用Java序列化器或者Kryo进行序列化,而是使用一个特定的编码器进行序列化,这些序列化器可以自动生成,而且在spark执行很多操作(过滤、排序、hash)的时候不用进行反序列化。
scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> caseClassDS.show()
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> primitiveDS.map(_ + 1).collect()
res28: Array[Int] = Array(2, 3, 4)
scala> val path = "D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.json"
path: String = D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.json
scala> val peopleDS = spark.read.json(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> peopleDS.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
RDD和DataSet相互转换
Spark SQL 支持两种不同的方法用于将存在的RDD转成Datasets。
第一种方法使用反射去推断包含特定类型对象的RDD模式(schema),该模式使你的代码更加简练,不过你必须在写Spark的程序的时候已经知道模式信息(比如RDD中的对象是自己定义的case class类型)。
第二种方法是通过一个编程接口,此时你需要构造一个模式,将其应用到一个已经存在的RDD上将其转化为DataFrame,该方法适用于运行之前还不知道列以及列的类型。
使用反射推断Schema
Spark SQL 的 Scala 接口支持自动转换一个包含 case classes 的 RDD 为 DataFrame.Case class 定义了表的 Schema.Case class 的参数名使用反射读取并且成为了列名.Case class 也可以是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型.这个 RDD 能够被隐式转换成一个 DataFrame 然后被注册为一个表.表可以用于后续的 SQL 语句。
scala> val peopleDF = spark.read.textFile("D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> peopleDF.createOrReplaceTempView("people")
scala> val teenagerDF = spark.sql("SELECT name,age FROM people where age BETWEEN 13 AND 19")
teenagerDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> teenagerDF.map(teenager => "Name:" + teenager(0)).show()
+-----------+
| value|
+-----------+
|Name:Justin|
+-----------+
scala> teenagerDF.map(teenager => "Name:" + teenager.getAs[String]("name")).show()
+-----------+
| value|
+-----------+
|Name:Justin|
+-----------+
scala> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
mapEncoder: org.apache.spark.sql.Encoder[Map[String,Any]] = class[value[0]: binary]
scala> teenagerDF.map(teenagerDF => teenagerDF.getValuesMap[Any](List("name", "age"))).collect()
res33: Array[Map[String,Any]] = Array(Map(name -> Justin, age -> 19))
以编程的方式指定Schema
当case class不能提前定义时(比如记录的结构被编码为字符串,或者当文本数据集被解析时不同用户需要映射不同的字段),可以通过下面三步来将RDD转换为DataFrame:
1、从原始RDD创建得到一个包含Row对象的RDD。
2、创建一个与第1步中Row的结构相匹配的StructType,以表示模式信息。
3、通过createDataFrame()将模式信息应用到第1步创建的RDD上。
scala> val schemaString = "name age"
schemaString: String = name age
scala> val fields = schemaString.split(" ").map(field => StructField(field, StringType, nullable = true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,StringType,true))
scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true))
scala>
scala> val peopleRDD = spark.sparkContext.textFile("D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = D:/workspace_spark/spark-2.2.0/examples/src/main/resources/people.txt MapPartitionsRDD[115] at textFile at <console>:47
scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[117] at map at <console>:49
scala>
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: string]
scala> peopleDF.createOrReplaceTempView("people")
scala> val results = spark.sql("SELECT name FROM people")
results: org.apache.spark.sql.DataFrame = [name: string]
scala> results.map(attributes => "Name: " + attributes(0)).show()
+-------------+
| value|
+-------------+
|Name: Michael|
| Name: Andy|
| Name: Justin|
+-------------+
聚合函数
SparkSQL提供了常用的聚合函数如count(), countDistinct(), avg(), max(), min(),当上述函数不满足使用要求时,用户可以自定义聚合函数。
自定义聚合函数(无类型)
用户通过实现UserDefinedAggregateFunction的相关接口来创建聚合函数,UserDefinedAggregateFunction相关接口的含义见代码所示,如下定义求平均值的函数MyAverage。
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoders, Encoder, Row, SparkSession}
object MyAverage extends UserDefinedAggregateFunction {
// 输入参数数据类型
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
//聚合缓存数据类型
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 输出参数数据类型
def dataType: DataType = DoubleType
//相同的输入是否返回相同的输出
def deterministic: Boolean = true
//初始化聚合缓存
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//聚合缓存更新(buffer和row)
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
//聚合缓存合并(buffer和buffer)
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
//计算最终结果
def evaluate(buffer: Row): Double = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}
定义完成并注册后,则可以像内建函数一样使用。
scala> spark.udf.register("myAverage", MyAverage)
res0: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = [email protected]
scala> val df = spark.read.json("D:/workspace_spark/spark-2.2.0/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
scala> df.createOrReplaceTempView("employees")
scala> df.show()
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
scala> val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result: org.apache.spark.sql.DataFrame = [average_salary: double]
scala> result.show()
+--------------+
|average_salary|
+--------------+
| 3750.0|
+--------------+
自定义类型安全的聚合函数
自定义类型安全的聚合函数是通过实现org.apache.spark.sql.expressions.Aggregator
的接口来完成的,一个求平均值的聚合函数示例如下:
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoders, Encoder, Row, SparkSession}
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage2 extends Aggregator[Employee, Average, Double] {
/**
* A zero value for this aggregation. Should satisfy the property that any b + zero = b.
* @since 1.6.0
*/
override def zero: Average = Average(0L, 0L)
/**
* Combine two values to produce a new value. For performance, the function may modify `b` and
* return it instead of constructing new object for b.
* @since 1.6.0
*/
override def reduce(b: Average, a: Employee): Average = {
b.sum = b.sum + a.salary
b.count = b.count + 1
b
}
/**
* Transform the output of the reduction.
* @since 1.6.0
*/
override def finish(reduction: Average): Double = {
reduction.sum.toDouble / reduction.count
}
/**
* Merge two intermediate values.
* @since 1.6.0
*/
override def merge(b1: Average, b2: Average): Average = {
b1.sum = b1.sum + b2.sum
b1.count = b1.count + b2.count
b1
}
/**
* Specifies the `Encoder` for the intermediate value type.
* @since 2.0.0
*/
override def bufferEncoder: Encoder[Average] = Encoders.product
/**
* Specifies the `Encoder` for the final ouput value type.
* @since 2.0.0
*/
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
数据源
DataFrame 可以当做标准的RDD进行操作,也可以注册为一个临时表。将DataFrame注册为一个临时表,准许你在上执行SQL查询。
DataFrame接口可以处理多种数据源,SparkSql 也内建若干个有用的数据源格式(Json、parquet、jdbc)。此外,当你用SQL查询的数据源的时候,只使用了一部分字段,SparkSQL可以智能扫描这些字段。
标准的加载和保存函数
在最简单的形式中, 默认数据源(parquet, 除非另有配置 spark.sql.sources.default )将用于所有操作.
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手工指定选项
你可以通过手工指定数据源和任何想要传递给数据源的选项。指定数据源通常需要使用数据源全名(如org.apache.spark.sql.parquet),但对于内建数据源,你也可以使用它们的短名(json、parquet和jdbc)。并且不同的数据源类型之间都可以相互转换。
示例如下:
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
直接在文件上运行 SQL
不使用读取 API 将文件加载到 DataFrame 并进行查询, 也可以直接用 SQL 查询该文件.
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Save Modes (保存模式)
Save operations (保存操作)可以选择使用 SaveMode , 它指定如何处理现有数据如果存在的话. 重要的是要意识到, 这些 save modes (保存模式)不使用任何 locking (锁定)并且不是 atomic (原子). 另外, 当执行 Overwrite 时, 数据将在新数据写出之前被删除.
Saving to Persistent Tables (保存到持久表)
DataFrames 也可以使用 saveAsTable 命令作为 persistent tables (持久表)保存到 Hive metastore 中. 请注意, existing Hive deployment (现有的 Hive 部署)不需要使用此功能. Spark 将为您创建默认的 local Hive metastore (本地 Hive metastore)(使用 Derby ). 与 createOrReplaceTempView 命令不同, saveAsTable 将 materialize (实现) DataFrame 的内容, 并创建一个指向 Hive metastore 中数据的指针. 即使您的 Spark 程序重新启动, Persistent tables (持久性表)仍然存在, 因为您保持与同一个 metastore 的连接. 可以通过使用表的名称在 SparkSession 上调用 table 方法来创建 persistent tabl (持久表)的 DataFrame .
对于 file-based (基于文件)的 data source (数据源), 例如 text, parquet, json等, 您可以通过 path 选项指定 custom table path (自定义表路径), 例如 df.write.option("path", "/some/path").saveAsTable("t") . 当表被 dropped (删除)时, custom table path (自定义表路径)将不会被删除, 并且表数据仍然存在. 如果未指定自定义表路径, Spark 将把数据写入 warehouse directory (仓库目录)下的默认表路径. 当表被删除时, 默认的表路径也将被删除.
Bucketing, Sorting and Partitioning (分桶, 排序和分区)
对于 file-based data source (基于文件的数据源), 也可以对 output (输出)进行 bucket 和 sort 或者 partition . Bucketing 和 sorting 仅适用于 persistent tables :
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
在使用 Dataset API 时, partitioning 可以同时与 save 和 saveAsTable 一起使用.
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
可以为 single table (单个表)使用 partitioning 和 bucketing:
peopleDF.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")
partitionBy 创建一个 directory structure (目录结构), 如 Partition Discovery 部分所述. 因此, 对 cardinality (基数)较高的 columns 的适用性有限. 相反, bucketBy 可以在固定数量的 buckets 中分配数据, 并且可以在 a number of unique values is unbounded (多个唯一值无界时)使用数据.
Parquet Files
Parquet 格式是被许多其他的数据处理系统支持的列数据格式类型。Spark Sql支持在读写Parquet文件的时候自动保存原始数据的模式信息。在写Parquet文件时候,所有的列将会因为兼容原因转成nullable。
Loading Data Programmatically (以编程的方式加载数据)
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
Partition Discovery (分区发现)
Table partitioning (表分区)是在像 Hive 这样的系统中使用的常见的优化方法. 在 partitioned table (分区表)中, 数据通常存储在不同的目录中, partitioning column values encoded (分区列值编码)在每个 partition directory (分区目录)的路径中. Parquet data source (Parquet 数据源)现在可以自动 discover (发现)和 infer (推断)分区信息. 例如, 我们可以使用以下 directory structure (目录结构)将所有以前使用的 population data (人口数据)存储到 partitioned table (分区表)中, 其中有两个额外的列 gender 和 country 作为 partitioning columns (分区列):
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
通过将 path/to/table 传递给 SparkSession.read.parquet 或 SparkSession.read.load , Spark SQL 将自动从路径中提取 partitioning information (分区信息). 现在返回的 DataFrame 的 schema (模式)变成:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
请注意, 会自动 inferred (推断) partitioning columns (分区列)的 data types (数据类型).目前, 支持 numeric data types (数字数据类型)和 string type (字符串类型).有些用户可能不想自动推断 partitioning columns (分区列)的数据类型.对于这些用例, automatic type inference (自动类型推断)可以由 spark.sql.sources.partitionColumnTypeInference.enabled 配置, 默认为 true .当禁用 type inference (类型推断)时, string type (字符串类型)将用于 partitioning columns (分区列).
从 Spark 1.6.0 开始, 默认情况下, partition discovery (分区发现)只能找到给定路径下的 partitions (分区).对于上述示例, 如果用户将 path/to/table/gender=male 传递给 SparkSession.read.parquet 或 SparkSession.read.load , 则 gender 将不被视为 partitioning column (分区列).如果用户需要指定 partition discovery (分区发现)应该开始的基本路径, 则可以在数据源选项中设置 basePath.例如, 当 path/to/table/gender=male 是数据的路径并且用户将 basePath 设置为 path/to/table/, gender 将是一个 partitioning column (分区列).
Schema Merging (模式合并)
像 ProtocolBuffer , Avro 和 Thrift 一样, Parquet 也支持 schema evolution (模式演进). 用户可以从一个 simple schema (简单的架构)开始, 并根据需要逐渐向 schema 添加更多的 columns (列). 以这种方式, 用户可能会使用不同但相互兼容的 schemas 的 multiple Parquet files (多个 Parquet 文件). Parquet data source (Parquet 数据源)现在能够自动检测这种情况并 merge (合并)所有这些文件的 schemas .
由于 schema merging (模式合并)是一个 expensive operation (相对昂贵的操作), 并且在大多数情况下不是必需的, 所以默认情况下从 1.5.0 开始. 你可以按照如下的方式启用它:
读取 Parquet 文件时, 将 data source option (数据源选项) mergeSchema 设置为 true (如下面的例子所示)
将 global SQL option (全局 SQL 选项) spark.sql.parquet.mergeSchema 设置为 true。
// This is used to implicitly convert an RDD to a DataFrame. import spark.implicits._ // Create a simple DataFrame, store into a partition directory val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube") cubesDF.write.parquet("data/test_table/key=2") // Read the partitioned table val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true)
以上是关于spark知识体系04-SQL,DataFrames,DateSets的主要内容,如果未能解决你的问题,请参考以下文章