:SparkSQL基本操作

Posted 超哥--

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了:SparkSQL基本操作相关的知识,希望对你有一定的参考价值。

系列文章目录

spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作


文章目录

前言

接下来我们学习SparkSQL他和Hql有些相似。Hql是将操作装换成MR,SparkSQL也是,不过是使用Spark引擎来操作,效率更高一些


一、添加pom

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.2.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.2.3</version>

以上是这次博客需要的所有依赖,一次性全加上。

二、常用操作


一共这么多,挨个讲解一下

1.类型转换

SparkSQL中有三种常用的类型,RDD之前说过就不说了。
DataFrame
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。
DSL 语法
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了

SparkSql_Basic.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame, Dataset, Row, SparkSession

object SparkSql_Basic 
  def main(args: Array[String]): Unit = 
    // 创建SparkSQL的运行环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

//    val df: DataFrame = spark.read.json("datas/user.json")
//    df.show()

    //DataFrame => SQL
//    df.createOrReplaceTempView("user")
//    spark.sql("select age from user").show()


    //DtaFrame => DSL
    // 在使用DataFrame时,如何涉及到转换操作,需要引入转换规则

//    df.select("age","username").show()
//    df.select($"age"+1).show()
//    df.select('age+1).show()

    // DataSet
    // DataFrame 是特定泛型的DataSet
//    val seq: Seq[Int] = Seq(1, 2, 3, 4)
//    val ds: Dataset[Int] = seq.toDS()
//    ds.show()

    // RDD <=>DataFrame
    val rdd=spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",40)))
    val df: DataFrame = rdd.toDF("id", "name", "age")
    val rowRDD: RDD[Row] = df.rdd

    // DataFrame <=> DatsSet
    val ds: Dataset[User] = df.as[User]
    val df1: DataFrame = ds.toDF()

    // RDD <=> DataSet
    val ds1: Dataset[User] = rdd.map 
      case (id, name, age) => 
        User(id, name, age)
      
    .toDS()

    val userRDD: RDD[User] = ds1.rdd


    // 关闭环境
    spark.close()

  

  case class User(id:Int,name:String,age:Int)



2.连接mysql

SparkSQL提供了多种数据接口,我们可以通过JDBC连接Mysql数据库,我们先随便在数据库里边写点东西。

SparkSql_JDBC.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame, SaveMode, SparkSession

object SparkSql_JDBC 
  def main(args: Array[String]): Unit = 
    // 创建SparkSQL的运行环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

    val df: DataFrame = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "000000")
      .option("dbtable", "user")
      .option("useSSL","false")
      .load()

    df.show

    df.write
      .format("jdbc")
      .option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "000000")
      .option("dbtable", "user1")
      .option("useSSL","false")
      .mode(SaveMode.Append)
      .save()

    // 关闭环境
    spark.close()
  

3.UDF函数

这个函数可以对简单的数据进行处理,但是比较局限.
这次我们从json文件读取数据

"username": "zhangsan", "age": 20
"username": "lisi", "age": 30
"username": "wangwu", "age": 40

SparkSql_UDF.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame, Dataset, Row, SparkSession

object SparkSql_UDF 
  def main(args: Array[String]): Unit = 
    // 创建SparkSQL的运行环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

    val df: DataFrame = spark.read.json("datas/user.json")
    df.createOrReplaceTempView("user")

    spark.udf.register("prefixName",(name:String)=>
      "Name:" + name
    )

    spark.sql("select age ,prefixName(username) from user").show()

    // 关闭环境
    spark.close()
  


4.UDAF函数

UDAF函数的处理能力就比UDF强大多了,可以完成一些更复杂的操作.
SparkSql_UDAF1.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.types.DataType, LongType, StructField, StructType
import org.apache.spark.sql.DataFrame, Encoder, Encoders, Row, SparkSession, functions

object SparkSql_UDAF1 
  def main(args: Array[String]): Unit = 
    // 创建SparkSQL的运行环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    val df: DataFrame = spark.read.json("datas/user.json")
    df.createOrReplaceTempView("user")

    //计算平均年龄
    spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF()))

    spark.sql("select ageAvg(age) from user").show()


    // 关闭环境
    spark.close()

  

  case class Buff( var total:Long,var count:Long)

  class MyAvgUDAF extends Aggregator[Long,Buff,Long]
    //初始值
    override def zero: Buff = 
      Buff(0L,0L)
    
    //更新缓冲区
    override def reduce(buff: Buff, in: Long): Buff = 
      buff.total=buff.total+in
      buff.count=buff.count+1
      buff
    

    //合并缓冲区
    override def merge(buff1: Buff, buff2: Buff): Buff = 
      buff1.total=buff1.total+buff2.total
      buff1.count=buff1.count+buff2.count
      buff1
    

    //计算结果
    override def finish(buff: Buff): Long = 
      buff.total/buff.count
    

    //缓冲区编码操作
    override def bufferEncoder: Encoder[Buff] = Encoders.product

    //输出的编码操作
    override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  




还有一种方法,在Spark3已经不被官方推荐了,所以这里就不叙述了.

5.连接hive

首先我们在集群先,启动Hadoop和Hive
然后将jdbc的jar包放到hive的lib文件中

这个jar包在安装Hive环境时,使用过.
将虚拟机中的hive配置文件,hive-site.xml导出

放到idea的resource文件夹中,然后最好吧target文件夹删除,因为idea有可能从target中直接读取之前的数据,从而没有扫描hive-site.xml

我们就做最简单的查询操作
SparkSql_Hive.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame, SaveMode, SparkSession

object SparkSql_Hive 
  def main(args: Array[String]): Unit = 

    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

    spark.sql("show tables").show

    // 关闭环境
    spark.close()

  




如果能查询hive中的数据库,代表成功.

总结

SparkSQL的常用操作基本就这些,至于项目吗,下次专门在写一次吧

sparksql系列 sparksql列操作窗口函数join

一:Sparksql列操作

初始化SparkContext及数据:

import java.util.Arrays

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
import java.util.ArrayList

object WordCount {

  def initSparkAndData() : DataFrame = {

    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)
    val nameRDD = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘age‘:‘18‘,‘vip‘:‘t‘}",
      "{‘name‘:‘sunliu‘,‘age‘:‘19‘,‘vip‘:‘t‘}","{‘name‘:‘zhangsan‘,‘age‘:‘20‘,‘vip‘:‘f‘}"));
    val namedf = sparkSession.read.json(nameRDD)

    namedf
  }
}

增加列

    val data = initSparkAndData()

    //方法一:可以添加常量值
    data.select(when(col("name").isNotNull,1).otherwise(0) as "usergroup").show(100)   
    //方法二:  只能已经存在的列操作
    data.withColumn("time", concat(col("age"),col("name")) ).show(100)

删除列

    val data = initSparkAndData()
    data.drop("vip").show(100)

二:SparkSql 窗口函数

    传统数据库中就有这个函数,就是partation by () order by ()。那下面让我们看看sparksql中怎么写:

    val data = initSparkAndData()
    data.withColumn("isVsip", row_number().over(Window.partitionBy(col("vip")).orderBy(desc("name")))).show(100)

    上面的意思是按照VIP分组,后按照name排序,作为新的列isVsip。项目中用来作为提取最新记录的函数,举例如下:

    统计出用户最近登录记录:

    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD1 = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘time‘:‘2019-08-12‘}",
      "{‘name‘:‘sunliu‘,‘time‘:‘2019-08-13‘}","{‘name‘:‘zhangsan‘,‘time‘:‘2019-08-14‘}"));
    val namedf1 = sparkSession.read.json(nameRDD1)

    val nameRDD2 = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘time‘:‘2019-09-12‘}",
      "{‘name‘:‘sunliu‘,‘time‘:‘2019-08-13‘}","{‘name‘:‘zhangsan‘,‘time‘:‘2019-07-14‘}"));
    val namedf2 = sparkSession.read.json(nameRDD2)
    //上面全是构造数据。
    namedf1.union(namedf2).withColumn("max_time", row_number().over(Window.partitionBy(col("name")).orderBy(desc("time"))))
      .filter(col("max_time") ===1).show(100)

三:Sparksql join操作

初始化SparkContext及数据:

import java.util.Arrays

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
import java.util.ArrayList


object WordCount {
  def joinTestData() = {
    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD = javasc.parallelize(Arrays.asList("{‘name‘:‘zhangsan‘,‘age‘:‘18‘,‘sex‘:‘N‘}", "{‘name‘:‘lisi‘,‘age‘:‘19‘,‘sex‘:‘F‘}","{‘‘:‘‘,‘‘:‘‘,‘‘:‘‘}"));
    val nameRDD1 = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘age‘:‘18‘,‘vip‘:‘t‘}", "{‘name‘:‘sunliu‘,‘age‘:‘19‘,‘vip‘:‘t‘}","{‘name‘:‘zhangsan‘,‘age‘:‘18‘,‘vip‘:‘f‘}"));
    val data1 = sparkSession.read.json(nameRDD)
    val data2 = sparkSession.read.json(nameRDD1)

    (data1,data2)
  }
}

left、leftouter、left_outer三者相同

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val left = data1.join(data2,data1("name") === data2("name") ,"left").show(100)

       结果:

age name sex age name vip
null null null null null null
18 zhangsan N 18 zhangsan f
19 lisi f null null null

 

right、rightouter、right_outer三者相同

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val right = data1.join(data2,data1("name") === data2("name") ,"right").show(100)

       结果:

age name sex age name vip
null null null 18 wangwu t
18 zhangsan N 18 zhangsan f
null null null   sunliu t

 

cross、inner两者相同

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val right = data1.join(data2,data1("name") === data2("name") ,"right").show(100)

       结果:

age name sex age name vip
18 zhangsan N 18 zhangsan f

full、fullouter、full_outer、outer四者相同

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val full = data1.join(data2,data1("name") === data2("name") ,"full").show(100)

       结果:

age name sex age name vip
null null null 18 wangwu t
null null null null null null
18 zhangsan N 18 zhangsan f
null null null 19 sunliu t
19 lisi F null null null

 

leftsemi(innerjoin之后只保留左边的)

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val leftsemi = data1.join(data2,data1("name") === data2("name") ,"leftsemi").show(100)

    真正在项目中的使用:项目中有一张大表,主键是用户ID,里面有用户所有基本信息。项目使用过程中一般要求关联大表取得所有基本信息,leftsemi一般用于缩减大表。

       结果:

age name sex
18 zhangsan N

leftanti(innerjoin之后去除能关联上之外的)

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val leftouter = data1.join(data2,data1("name") === data2("name") ,"leftanti").show(100)

       结果:

age name sex
null null null
19 lisi F

以上是关于:SparkSQL基本操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark之SparkSql

大数据学习:Spark SQL入门简介

大数据-spark理论sparkSql,sparkStreaming,spark调优

大数据之Spark:SparkSQL开窗函数实战

spark

入门大数据---SparkSQL联结操作