[Spark2.0]Spark SQL, DataFrames 和Datasets指南

Posted yhao浩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Spark2.0]Spark SQL, DataFrames 和Datasets指南相关的知识,希望对你有一定的参考价值。

综述

        Spark SQLSpark提供的针对结构化数据处理的模块。不同于基本的Spark RDD APISparkSQL提供的接口提供了更多的关于数据和计算执行的信息。在内部,SparkSQL使用这些额外信息完成额外的优化。这里有几种方式可以和SparkSQL相互操作,包括SQLDataset API。计算结果的时候使用相同的执行

        本页中所有示例使用到的样例的数据都包含在Spark发布中,而且都能在spark-shellpyspark或者sparkR中运行。

 

SQL

        Spark SQL的一种用法是执行SQL查询。Spark SQL也可以用于从已安装的Hive中读取数据。更多的关于此特性的配置,请参考Hive Tables。当从内部其他编程语言执行SQL,结果将以Dataset/DataFrame形式返回。你也可以通过command-line或者JDBC/ODBCSQL接口进行交互。

 

DatasetsDataFrames

        Dataset是分布式数据集合。DatasetSpark1.6新增的接口,用以提供RDDs(强类型,有使用强大的lambda函数的能力)的优点和Spark SQL的经优化的执行引擎的优点。Dataset可以从JVM对象进行构造并通过转换函数(如mapflatmapfilter等)进行操作。DatasetAPI支持ScalaJavaPython不支持Dataset API。但因为Python本身的动态性,DatasetAPI的许多优点都已经可用(比如,你可以通过名字很自然的访问一行的某一个字段,如row.columnName),R的情况与此类似。

        DataFrameDataset组织成命名列的形式。它在概念上相当于关系型数据库中的表,或者R/Python中的数据帧,但是在底层进行了更多的优化。DataFrames可以从多种数据源创建,例如:结构化数据文件、Hive中的表、外部数据库或者已存在的RDDsDataFrame API支持ScalaJavaPythonR。在ScalaJavaDataFrame其实是DatasetRowS的形式的表示。在Scala API中,DataFrame仅仅是Dataset[Row]的别名。但在Java中,使用者需要使用Dataset<Row>来表示一个DataFrame

       在本文档中,我们会经常将Scala/Java DatasetRowS作为DataFrame的参考。

 

 

开始使用

起始点:SparkSession

       Spark中所有功能的切入点是SparkSession类。直接使用SparkSession.builder()就可以创建一个基本的SaprkSession


       可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"找到完整的代码。

        SparkSessionSpark2.0开始提供的内建了对Hive特性的支持,包括使用HiveQL写查询语句、调用Hive UDFs、从Hive表读取数据的能力。你不需要事先部署Hive就能使用这些特性。

 

创建DataFrame

       使用SparkSession,应用可以从已存在的RDDHive表或者Spark数据源创建DataFrame。下面的示例从一个JSON文件创建一个DataFrame:


       可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"找到完整的代码。

 

无类型的Dataset操作(aka DataFrame Operations

        DataFrameScalaJavaPythonR中为结构化数据操作提供了一个特定领域语言支持。

       就像网文提到的,在Spark2.0中,在ScalaJavaAPI中,DataFrame仅仅是DatasetRowS表示。与Scala/Java中的强类型的“带类型转换操作”相比,这些操作也可以看做“无类型转换操作”。

这里我们提供了一些使用Dataset进行结构化数据处理的基本示例:

       可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"找到完整的代码。

       可以在Dataset上执行的操作的类型的完整列表可以参考API文档

       除了简单的列引用和表达式外,Dataset同时有丰富的函数库,包括字符串操作、日期算法、常用数学操作等。完整的列表可参考DataFrame Function Reference

 

编程执行SQL查询语句

       Sparksession中的sql函数使得应用可以编程式执行SQL查询语句并且已DataFrame形式返回:

       可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"找到完整的代码。

 

创建Dataset

        DatasetRDD很像,不同的是它并不使用Java序列化或者Kryo,而是使用特殊的编码器来为网络间的处理或传输的对象进行序列化。对转换一个对象为字节的过程来说编码器和标准系列化器都是可靠的,编码器的代码是自动生成并且使用了一种格式,这种格式允许Spark在不需要将字节解码成对象的情况下执行很多操作,如filteringsortinghashing等。

可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"找到完整的代码。

 

RDD互操作

        Spark SQL支持两种将已存在的RDD转化为Dataset的方法。第一种方法使用反射推断包含特定类型对象的RDD的结构。这种基于反射的方法代码更加简洁,并且当你在写Spark程序的时候已经知道RDD的结构的情况下效果很好。

       第二种创建Dataset的方法是通过编程接口建立一个结构,然后将它应用于一个存在的RDD。虽然这种方法更加繁琐,但它允许你在运行之前不知道其中的列和对应的类型的情况下构建Dataset

 

使用反射推断结构

        Spark SQLScala接口支持自动的将一个包含case classRDD转换为DataFrame。这个case class定义了表结构。Caseclass的参数名是通过反射机制读取,然后变成列名。Caseclass可以嵌套或者包含像SeqArray之类的复杂类型。这个RDD可以隐式的转换为一个DataFrame,然后被注册为一张表。这个表可以随后被SQLstatement使用。

       可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"in the Spark repo. 找到完整的代码。

 

以编程方式指定模式

       case class不能被事先定义(比如记录的结构被编码为字符串,或者对不同的用户,文本数据集被不同的解析并进行字段投影),DataFrame可以通过以下3个步骤实现编程创建:

  1. 从原始RDD创建RowS形式的RDD
  2. StructType创建匹配步骤1RowS形式的RDD的模式
  3. 通过SparkSession提供的createDataFrame方法将模式应用于RowS形式的RDD

 

例如:

       可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala"找到完整的代码。

 

 

数据源

        Spark SQL通过DataFrame接口,可以支持对多种数据源的操作。DataFrame可以使用关系转换来进行操作,而且可以用来创建临时视图。将DataFrame注册为临时视图可以允许你在数据上运行SQL查询语句。本节讲解使用SparkData Source加载数据和保存数据的通用方法,然后

详细讲述内部支持的数据源可用的特定操作。

 

通用Load/Save函数

       最简单的,默认的数据源(parquet,除非使用spark.sql.sources.default进行了配置)将被用于所有的操作。

       可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"找到完整的代码。

 

手动指定选项

       你可以手动指定数据源以及数据源附带的额外选项。数据源被他们的完全限定名来指定(如,org.apache.spark.sql.parquet),但对于内部支持的数据源,你可以使用短名(jsonparquetjdbc)。DataFrame可以使用这种语法从任何可以转换为其他类型的数据源加载数据。

       可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"找到完整的代码。

 

在文件上直接执行SQL

       除了使用读取API加载一个文件到SATAFrame然后查询它的方式,你同样可以通过SQL直接查询文件。

       可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"找到完整的代码。

 

保存模式

       保存操作可选SaveMode,它指定了如何处理现有的数据。需要重视的一点是这些保存模式没有使用任何的锁,并且不具有原子性。此外,当执行Overwrite时,数据将先被删除,然后写出新数据。

Scala/Java

其他语言

含义

SaveMode.ErrorIfEcists(默认)

error”(默认)

保存DataFrame到数据源时,如果数据已经存在,将抛出一个异常。

SaveMode.Append

append

保存DataFrame到数据源时,如果数据/表存在时,DataFrame的内容将追加到已存在的数据后。

SaveMode.Overwrite

overwrite

Overwrite模式意味着当保存一个DataFrame到数据源时,如果数据/表已经存在,存在的数据将会被DataFrame的内容覆盖。

SaveMode.Ignore

ignore

Ignore模式意味着当保存一个DataFrame到数据源时,如果数据已经存在,保存操作将不会保存DataFrame的内容,并且不会改变原数据。这与SQL中的CREATE TABLE IF NOT EXISTS相似。

 

保存到持久化表

       也可以通过saveAsTable命令将DataFrame作为持久化表保存到Hive元数据库中。注意使用此特性时不需要事先部署HiveSpark将为你创建一个默认的本地Hive元数据库(使用Derby)。不同于createOrReplaceTempView命令,saveAsTable将具体化DataFrame的内容并且在Hive元数据库中创建一个指向数据的指针。在你保持你的连接是到相同的元数据库时,当你的Spark程序重启后持久化表依然会存在。通过在SparkSession上使用表名调用table命令,可以创建用于持久化表的DataFrame

       默认的saveAsTable将会创建一个托管表,意味着数据的位置酱油元数据库控制。托管表也有他们自己的数据,当对应的表被删除时这些数据会一并删除。

 

Parquet文件

       Parquet是一种被很多其他数据处理系统支持的列式文件。Spark SQL提供了可以自动保存原始数据模式的对Parquet文件读取和写入的操作。当写入一个Parquet文件时,因为兼容性原因,所有的列都会自动转换为nullable(可为空的)。

 

编程式加载数据

       使用上面例子的数据:

       可以从Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"找到完整的代码。

 

分区发现

       表分区是Hive等系统中常用的优化方法。在一个分区表中,数据常常存放在不同的目录中,根据分区列的值的不同,编码了每个分区目录不同的路径。目前parquet数据源已经可以自动的发现和推断分区信息。例如,我们可以用下面的目录结构存储所有我们以前经常使用的数据到分区表,只需要额外的添加两个列gendercountry作为分区列:

       使用SparkSession.read.parquet或者SparkSession.read.load加载path/to/table后,Spark SQL能够自动的从路径中提取分区信息。返回的DataFrame的模式结构是:

       注意分区列的数据类型是自动推断的。目前支持数值型数据和字符串型数据。有时候用户并不想自动推断分区列的数据类型,这种情况下,可以通过配置spark.sql.sources.partitionColumnTypeInference.enabled这个参数来配置自动类型推断,默认情况下是true。当关闭类型推断后,分区列的类型将为字符串型。

       Spark1.6.0开始,在默认情况下,只在给定的路径下进行分区发现。在上述的例子中,如果用户将path/to/table/gender=male传给SparkSession.read.parquet或者SparkSession.read.loadgender将会被认为是分区列。如果用户需要指定分区发现开始的基础路径,可以将basePath设置到数据源选项。例如,当path/to/table/gender=male是数据的路径,并且用户设置basePathpath/to/tablegender将作为分区列。

 

模式(schema)合并

       ProtocolBufferAvro,和Thrift类似,Parquet同样支持schema的演变。用户可以以一个简单点的schema开始,然后在需要时逐渐的添加更多列到schema。使用这种方法,用户将最终得到由不同的但是相互兼容的schema构成的多个Parquet文件。Parquet数据源目前可以自动的检测这种情况并且合并这些文件的schema

       由于合并schema是相对代价较大的操作,而且在大多数情况下并不需要这样,从1.5.0开始我们默认将它关闭,你可以通过以下方法使它生效:

  1. 在读取Parquet文件时(就像下面的例子)设置数据源操作mergeSchematrue
  2. 设置全局SQL选项spark.sql.parquet.mergeSchematrue

可以在Spark仓库的"examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"找到完整的代码

 

Hive原数据

       当读写Hive元存储Parquet表时,为了更好的性能,SparkSQL将试图使用它自己支持的Parquet代替Hive SerDe。这种行为可以通过spark.sql.hive.convertMetastoreParquet进行配置,默认已经开启。

 

spark2.0 连接mysql8.0数据库操作表数据

spark提交异常日志分析

学习Spark2.0中的Structured Streaming

CDH5.12.0 如何升级到Spark2.0 版本

spark2.0的10个特性介绍

cloudera上面安装Spark2.0