spark sql - Dataset数据类型
Posted nefu-ljw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark sql - Dataset数据类型相关的知识,希望对你有一定的参考价值。
spark sql - Dataset数据类型
以下内容翻译于spark sql Dataset类源码的注释:org.apache.spark.sql.Dataset
spark sql 2.11
Dataset是特定领域对象的强类型集合,可以使用函数或关系操作并行转换。 每个Dataset还有一个无类型的视图,称为DataFrame,它是一个Dataset of Row。DataFrame = Dataset[Row]
。
Dataset上可用的操作分为转换(transformation)和操作(action)。 转换是产生新数据集的,动作是触发计算并返回结果的。 示例转换包括 map、filter、select 和 aggregate (groupBy)。示例操作包括count、show或将数据写入文件系统。
Dataset是“惰性的”,即仅在调用操作时才触发计算。在内部,数据集代表一个逻辑计划,它描述了生成数据所需的计算。当一个动作被调用时,Spark 的查询优化器优化逻辑计划并生成物理计划以并行和分布式的方式高效执行。要探索逻辑计划以及优化的物理计划,请使用explain功能。
为了有效地支持特定领域的对象,需要一个编码器(Encoder)。编码器将域特定类型 T 映射到 Spark 的内部类型系统。 例如,给定一个类 Person,它有两个字段,name (string) 和 age (int),编码器用于告诉 Spark 在运行时生成代码以将 Person 对象序列化为二进制结构。这种二进制结构通常具有低得多的内存占用,并且针对数据处理效率进行了优化(例如,以柱状格式)。要了解数据的内部二进制表示,请使用 schema 函数。
通常有两种创建Dataset的方法。最常见的方法是使用 SparkSession 上可用的读取功能,将 Spark 指向存储系统上的某些文件。
val people = spark.read.parquet("...").as[Person]
Dataset也可以通过现有数据集上可用的转换来创建。 例如,以下通过对现有数据集应用过滤器(filter)来创建新数据集:
val names = people.map(_.name) // names是一个Dataset[String]
Dataset操作也可以是无类型的,通过定义在数据集(这个类)、列和函数中的各种特定领域语言 (DSL) 函数。 这些操作与 R 或 Python 中数据框抽象中可用的操作非常相似。
要从Dateset中选择列,请使用 Scala 中的 apply 方法。
val ageCol = people("age")
请注意,Column 类型也可以通过其各种函数进行操作。
// 下面创建一个新列,将每个人的年龄增加 10。
people("age") + 10
// Scala 中的一个更具体的例子
// 使用 SparkSession 创建 Dataset[Row]
val people = spark.read.parquet("...")
val department = spark.read.parquet("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), people("gender"))
.agg(avg(people("salary")), max(people("age")))
以上是关于spark sql - Dataset数据类型的主要内容,如果未能解决你的问题,请参考以下文章