SparkSQL - 介绍及使用 ScalaJavaPython 三种语言演示

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL - 介绍及使用 ScalaJavaPython 三种语言演示相关的知识,希望对你有一定的参考价值。

一、SparkSQL

前面的文章中使用 RDD 进行数据的处理,优点是非常的灵活,但需要了解各个算子的场景,需要有一定的学习成本,而 SQL 语言是一个大家十分熟悉的语言,如果可以通过编写 SQL 而操作RDD,学习的成本便会大大降低,在大数据领域 SQL 已经是数一个非常重要的范式,在 Hadoop 生态圈中,我们可以通过 Hive 进而转换成 MapReduces 进行数据分析,在后起之秀的 Flink 中也有 FlinkSQL 来简化数据的操作。

SparkSQL 可以理解成是将 SQL 解析成:RDD + 优化 再执行。

SparkSQL 对比 Hive

SparkSQLHive
计算方式基于 RDD 在内存计算转化为 MapReduces 需要磁盘IO读写
计算引擎SparkMR、Spark、Tez
性能
元数据无自身的元数据,可以与Hive metastore连接Hive metastore
缓存表支持不支持
视图支持支持
ACID不支持支持(hive 0.14)
分区支持支持
分桶支持支持

SparkSQL 的适用场景

数据类型说明
结构化数据有固定的 Schema ,例如:关系型数据库的表
半结构化数据没有固定的 Schema,但是有结构,数据一般是自描述的,例如:JSON 数据

理解 DataFrame 和 DataSet

SparkSQL的数据抽象是 DataFrameDataSet ,底层都是RDD

DataFrame 可以理解为是一个分布式表,包括:RDD - 泛型 + Schema约束(指定了字段名和类型) + SQL操作 + 优化

DataSetDataFrame 的基础上增加了泛型的概念。

例如:有文本数据,读取为 RDD 后,可以拥有如下数据:

1小明110@qq.com
2小张120@qq.com
3小王130@qq.com

如果转化为 DataFrame ,那就就可以拥有下面数据:

ID:bigint姓名:String邮箱:String
1小明110@qq.com
2小张120@qq.com
3小王130@qq.com

如果转化为 DataSet ,那就就可以拥有下面数据:

ID:bigint姓名:String邮箱:String泛型
1小明110@qq.comuser
2小张120@qq.comuser
3小王130@qq.comuser

DataSetDataFrame还是有挺大区别的,DataFrame开发都是写SQL,但是DataSet可以使用类似RDDAPI。也可以理解成DataSet就是存了个数据类型的RDD

二、通过 RDD 使用SparkSQL

如果是使用 ScalaJava 语言开发,需要引入 SparkSQL 的依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.1</version>
</dependency>

假如现在有如下文本文件,分别对应含义为:ID、名称、年龄、邮箱

1 小明 20 110.@qq.com
2 小红 29 120.@qq.com
3 李四 25 130.@qq.com
4 张三 30 140.@qq.com
5 王五 35 150.@qq.com
6 赵六 40 160.@qq.com

下面还是使用前面文章的方式读取文本为 RDD ,不过不同的是,我们将 RDD 转为 DataFrame 使用 SQL 的方式处理:

  • Scala:
object SQLRddScala 

  case class User(id: Int, name: String, age: Int, email: String)

  def main(args: Array[String]): Unit = 
    //声明 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("sparksql")
      .master("local[*]")
      .getOrCreate()
    //通过 SparkSession 获取 SparkContext
    val sc = spark.sparkContext
    //读取文件为 RDD
    val text = sc.textFile("D://test/input1/")
    //根据空格拆分字段
    val rdd = text.map(_.split(" ")).map(s => User(s(0).toInt, s(1), s(2).toInt, s(3)))
    //转化为 DataFrame,并指定 Schema
    val dataFrame = spark.createDataFrame(rdd)
    //打印 Schema
    dataFrame.printSchema()
    //查看数据
    dataFrame.show()
    //DSL 风格查询
    dataFrame.select("id","name").filter("age >= 30").show()
    //SQL 风格
    //注册表
    dataFrame.createOrReplaceTempView("user")
    //执行 SQL 语言
    spark.sql("select * from user where age >= 30").show()
    //关闭资源
    spark.stop()
  


  • Java:
public class SQLRddJava 

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class User 
        private Integer id;
        private String name;
        private Integer age;
        private String email;
    

    public static void main(String[] args) 
        // 声明 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();
        //  通过 SparkSession 获取 SparkContext
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
        // 读取文件为 RDD
        JavaRDD<String> text = sc.textFile("D://test/input1/");
        //根据空格拆分字段
        JavaRDD<User> rdd = text.map(s -> s.split(" ")).map(s -> new User(Integer.parseInt(s[0]), s[1], Integer.parseInt(s[2]), s[3]));
        //转化为 DataFrame,并指定 Schema
        Dataset<Row> dataFrame = spark.createDataFrame(rdd, User.class);
        //打印 Schema
        dataFrame.printSchema();
        // 查看数据
        dataFrame.show();
        //DSL 风格查询
        dataFrame.select("id","name").filter("age >= 30").show();
        // SQL 风格
        dataFrame.createOrReplaceTempView("user");
        // 注册表
        spark.sql("select * from user where age >= 30").show();
        // 执行 SQL 语言
        spark.stop();
    


  • Python:
from pyspark.sql import SparkSession
import findspark

if __name__ == '__main__':
    findspark.init()
    # 声明 SparkSession
    spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
    # 通过 SparkSession 获取 SparkContext
    sc = spark.sparkContext
    # 读取文件为 RDD
    text = sc.textFile("D:/test/input1/")
    # 根据空格拆分字段
    rdd = text.map(lambda s: s.split(" "))
    # 转化为 DataFrame,并指定 Schema
    dataFrame = spark.createDataFrame(rdd, ["id", "name", "age", "email"])
    # 打印 Schema
    dataFrame.printSchema()
    # 查看数据
    dataFrame.show()
    # DSL 风格查询
    dataFrame.select(["id","name"]).filter("age >= 30").show()
    # SQL 风格
    # 注册表
    dataFrame.createOrReplaceTempView("user")
    # 执行 SQL 语言
    spark.sql("select * from user where age >= 30").show()

    #关闭资源
    spark.stop()

打印的 Schema 信息:

全部数据内容:


DSL 查询结果:


SQL 查询结果:

三、多数据源交互

SparkSession 中可以通过: spark.read.格式(路径) 的方式, 获取 SparkSQL 中的外部数据源访问框架 DataFrameReaderDataFrameReader 有两种访问方式,一种是使用 load 方法加载,使用 format 指定加载格式, 还有一种是使用封装方法, 类似 csv, json, jdbc 等,这两种方式本质上一样,都是 load 的封装。

注意:如果使用 load 方法加载数据, 但是没有指定 format 的话, 默认是按照 Parquet 文件格式读取。

对于写数据SparkSQL 中增加了一个新的数据写入框架 DataFrameWriter ,同样也有两种使用方式,一种是使用 format 配合 save,还有一种是使用封装方法,例如 csv, json, saveAsTable 等,参数如下:

组件说明
source写入目标, 文件格式等, 通过 format 方法设定
mode写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter 向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode 方法设定
extraOptions外部参数, 例如 JDBC 的 URL, 通过 options, option 设定
partitioningColumns类似 Hive 的分区, 保存表的时候使用, 这个地方的分区不是 RDD 的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy 设定
bucketColumnNames类似 Hive 的分桶, 保存表的时候使用, 通过 bucketBy 设定
sortColumnNames用于排序的列, 通过 sortBy 设定

其中一个很重要的参数叫做 mode,表示指定的写入模式,可以传入Scala 对象表示或字符串表示,有如下几种方式:

Scala 对象表示字符串表示说明
SaveMode.ErrorIfExists“error”将 DataFrame 保存到 source 时, 如果目标已经存在, 则报错
SaveMode.Append“append”将 DataFrame 保存到 source 时, 如果目标已经存在, 则添加到文件或者 Table 中
SaveMode.Overwrite“overwrite”将 DataFrame 保存到 source 时, 如果目标已经存在, 则使用 DataFrame 中的数据完全覆盖目标
SaveMode.Ignore“ignore”将 DataFrame 保存到 source 时, 如果目标已经存在, 则不会保存 DataFrame 数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS

注意:如果没有指定 format, 默认的 formatParquet

1. 读写 CSV 格式

准备 CSV 文件:

  • Scala:
object SQLCSV 
  def main(args: Array[String]): Unit = 
    //声明 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("sparksql")
      .master("local[*]")
      .getOrCreate()

    //读取 CSV
    val csv = spark
      .read
      .schema("id int, name string, age int, email string")
      .option("header", "true") //第一行为标题
      .csv("D:/test/input1/test.csv")
    csv.printSchema()
    csv.show()
    // SQL 操作
    csv.createOrReplaceTempView("csv")
    spark.sql("select * from csv where age >= 30").show()
    //如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
    val csv1 = spark
      .read
      .schema("id int, name string, age int, email string")
      .option("delimiter", " ")
      .csv("D:/test/input1/test.txt")
    csv1.printSchema()
    csv1.show()
    //写出CSV文件
    csv1.write.mode(SaveMode.Overwrite).json("D:/test/output")
    //写出查询结果
    spark.sql("select * from csv where age <= 30")
      .write.mode(SaveMode.Overwrite).csv("D:/test/output1")
    spark.stop()
  

  • Java:
public class SQLCSVJava 
    public static void main(String[] args) 
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();
        //读取 CSV
        Dataset<Row> csv = spark.read()
                .schema("id int, name string, age int, email string")
                .option("header", "true") //第一行为标题
                .csv("D:/test/input1/test.csv");
        csv.printSchema();
        csv.show();
        // SQL 操作
        csv.createOrReplaceTempView("csv");
        spark.sql("select * from csv where age >= 30").show();
        //如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
        Dataset<Row> csv1 = spark.read()
                .schema("id int, name string, age int, email string")
                .option("delimiter", " ") //第一行为标题
                .csv("D:/test/input1/test.txt");
        csv1.printSchema();
        csv1.show();
        //写出CSV文件
        csv1.write().mode(SaveMode.Overwrite).json("D:/test/output");
        //写出查询结果
        spark.sql("select * from csv where age <= 30")
                .write().mode(SaveMode.Overwrite).csv("D:/test/output1");

        spark.close();
    

  • Python:
from pyspark.sql import SparkSession,DataFrameWriter
import findspark

if __name__ == '__main__':
    findspark.init()
    # 声明 SparkSession
    spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()
    # 读取 CSV
    csv = spark.read \\
      .schema("id int, name string, age int, email string") \\
      .option("header", "true") \\
      .csv("D:/test/input1/test.csv")
    csv.printSchema()
    csv.show()
    #  SQL 操作
    csv.createOrReplaceTempView("csv")
    spark.sql("select * from csv where age >= 30").show()
    # 如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符
    csv1 = spark.read \\
      .schema("id int, name string, age int, email string") \\
      .option("delimiter", " ") \\
      .csv("D:/test/input1/test.txt")
    csv1.printSchema()
    csv1.show()
    # 写出CSV文件
    csv1.write.mode("overwrite").json("D:/test/output")
    # 写出查询结果
    spark.sql("select * from csv where age <= 30").write.mode("overwrite").csv("D:/test/output1")
    #关闭资源
    spark.stop()

存储的 csv :

2. 读写Parquet 格式文件

先将上面 csv 文件转为 Parquet 文件:

//读取 CSV
Dataset<Row> csv = spark.read()
        .schema("id int, name string, age int, email string")
        .option("header", "true") //第一行为标题
        .csv("D:/test/input1/test.csv");
// 转化为 Parquet 文件
csv.write().mode(SaveMode.Overwrite).parquet("D:/test/output3");


将该文件名修改为 test.parquet 方便下面测试:

读取 Parquet 格式文件

  • Scala:
object SQLParquet 
  def main(args: Array[String]): Unit = 
    //声明 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("sparksql")
      .master("local[*]")
      .getOrCreate()

    //读取 parquet
    val parquet = spark.read.parquet("D:/test/output3/test.parquet")
    parquet.printSchema()
    parquet.show()
    // SQL 操作
    parquet.createOrReplaceTempView("parquet")
    spark.sql("select * from parquet where age >= 30").show()
    //写入 Parquet 的时候指定分区
    parquet.write.mode(SaveMode.Overwrite).partitionBy("age").csv("D:/test/output5")

    spark.stop()
  

  • Java
public class SQLParquetJava 
    public static void main(String[] args) 
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();
        //读取 parquet 
        Dataset<Row> parquet = spark.read().parquet("D:/test/output3/test.parquet");
        parquet.printSchema();
        parquet.show();
        // SQL 操作
        parquet.createOrReplaceTempView("parquet");
        spark.sql("select * from parquet where age >= 30").show();
        //写入 Parquet 的时候指定分区
        parquet.write().mode(SaveMode.Overwrite).partitionBy("age").parquet("D:/test/output5");

        spark.close();
    

  • Python:
from pyspark.sql import SparkSession,DataFrameWriter
import findspark

if __name__ == '__main__':
    findspark.init()
    # 声明 SparkSession
    spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

    #  读取 parquet
    parquet = spark.read.parquet("D:/test/output3/test.parquet")
    parquet.printSchema()
    parquet.show()
    #  SQL操作
    parquet.createOrReplaceTempView("parquet")
    spark.sql("select * from parquet where age >= 30").show()
    # 写入Parquet的时候指定分区
    parquet.write.mode("overwrite")Spark SQL的介绍和DataFrame的建立及使用

教程:Apache Spark SQL入门及实践指南!

oracleSQL 转 SPARKSQL(hiveSql) 及常用优化

oracleSQL 转 SPARKSQL(hiveSql) 及常用优化

SparkSQL介绍与Hive整合Spark的th/beeline/jdbc/thriftserve2shell方式使用SQL

SparkSpark SQL 物化视图技术原理与实践