:SparkSQL基本操作
Posted 超哥--
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了:SparkSQL基本操作相关的知识,希望对你有一定的参考价值。
系列文章目录
spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作
文章目录
前言
接下来我们学习SparkSQL他和Hql有些相似。Hql是将操作装换成MR,SparkSQL也是,不过是使用Spark引擎来操作,效率更高一些
一、添加pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.2.3</version>
以上是这次博客需要的所有依赖,一次性全加上。
二、常用操作
一共这么多,挨个讲解一下
1.类型转换
SparkSQL中有三种常用的类型,RDD之前说过就不说了。
DataFrame
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。
DSL 语法
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了
SparkSql_Basic.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame, Dataset, Row, SparkSession
object SparkSql_Basic
def main(args: Array[String]): Unit =
// 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
// val df: DataFrame = spark.read.json("datas/user.json")
// df.show()
//DataFrame => SQL
// df.createOrReplaceTempView("user")
// spark.sql("select age from user").show()
//DtaFrame => DSL
// 在使用DataFrame时,如何涉及到转换操作,需要引入转换规则
// df.select("age","username").show()
// df.select($"age"+1).show()
// df.select('age+1).show()
// DataSet
// DataFrame 是特定泛型的DataSet
// val seq: Seq[Int] = Seq(1, 2, 3, 4)
// val ds: Dataset[Int] = seq.toDS()
// ds.show()
// RDD <=>DataFrame
val rdd=spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",40)))
val df: DataFrame = rdd.toDF("id", "name", "age")
val rowRDD: RDD[Row] = df.rdd
// DataFrame <=> DatsSet
val ds: Dataset[User] = df.as[User]
val df1: DataFrame = ds.toDF()
// RDD <=> DataSet
val ds1: Dataset[User] = rdd.map
case (id, name, age) =>
User(id, name, age)
.toDS()
val userRDD: RDD[User] = ds1.rdd
// 关闭环境
spark.close()
case class User(id:Int,name:String,age:Int)
2.连接mysql
SparkSQL提供了多种数据接口,我们可以通过JDBC连接Mysql数据库,我们先随便在数据库里边写点东西。
SparkSql_JDBC.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame, SaveMode, SparkSession
object SparkSql_JDBC
def main(args: Array[String]): Unit =
// 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "000000")
.option("dbtable", "user")
.option("useSSL","false")
.load()
df.show
df.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "000000")
.option("dbtable", "user1")
.option("useSSL","false")
.mode(SaveMode.Append)
.save()
// 关闭环境
spark.close()
3.UDF函数
这个函数可以对简单的数据进行处理,但是比较局限.
这次我们从json文件读取数据
"username": "zhangsan", "age": 20
"username": "lisi", "age": 30
"username": "wangwu", "age": 40
SparkSql_UDF.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame, Dataset, Row, SparkSession
object SparkSql_UDF
def main(args: Array[String]): Unit =
// 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
spark.udf.register("prefixName",(name:String)=>
"Name:" + name
)
spark.sql("select age ,prefixName(username) from user").show()
// 关闭环境
spark.close()
4.UDAF函数
UDAF函数的处理能力就比UDF强大多了,可以完成一些更复杂的操作.
SparkSql_UDAF1.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.types.DataType, LongType, StructField, StructType
import org.apache.spark.sql.DataFrame, Encoder, Encoders, Row, SparkSession, functions
object SparkSql_UDAF1
def main(args: Array[String]): Unit =
// 创建SparkSQL的运行环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val df: DataFrame = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
//计算平均年龄
spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF()))
spark.sql("select ageAvg(age) from user").show()
// 关闭环境
spark.close()
case class Buff( var total:Long,var count:Long)
class MyAvgUDAF extends Aggregator[Long,Buff,Long]
//初始值
override def zero: Buff =
Buff(0L,0L)
//更新缓冲区
override def reduce(buff: Buff, in: Long): Buff =
buff.total=buff.total+in
buff.count=buff.count+1
buff
//合并缓冲区
override def merge(buff1: Buff, buff2: Buff): Buff =
buff1.total=buff1.total+buff2.total
buff1.count=buff1.count+buff2.count
buff1
//计算结果
override def finish(buff: Buff): Long =
buff.total/buff.count
//缓冲区编码操作
override def bufferEncoder: Encoder[Buff] = Encoders.product
//输出的编码操作
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
还有一种方法,在Spark3已经不被官方推荐了,所以这里就不叙述了.
5.连接hive
首先我们在集群先,启动Hadoop和Hive
然后将jdbc的jar包放到hive的lib文件中
这个jar包在安装Hive环境时,使用过.
将虚拟机中的hive配置文件,hive-site.xml导出
放到idea的resource文件夹中,然后最好吧target文件夹删除,因为idea有可能从target中直接读取之前的数据,从而没有扫描hive-site.xml
我们就做最简单的查询操作
SparkSql_Hive.scala
package com.atguigu.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame, SaveMode, SparkSession
object SparkSql_Hive
def main(args: Array[String]): Unit =
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
spark.sql("show tables").show
// 关闭环境
spark.close()
如果能查询hive中的数据库,代表成功.
总结
SparkSQL的常用操作基本就这些,至于项目吗,下次专门在写一次吧
sparksql系列 sparksql列操作窗口函数join
一:Sparksql列操作
初始化SparkContext及数据:
import java.util.Arrays
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
import java.util.ArrayList
object WordCount {
def initSparkAndData() : DataFrame = {
val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘age‘:‘18‘,‘vip‘:‘t‘}",
"{‘name‘:‘sunliu‘,‘age‘:‘19‘,‘vip‘:‘t‘}","{‘name‘:‘zhangsan‘,‘age‘:‘20‘,‘vip‘:‘f‘}"));
val namedf = sparkSession.read.json(nameRDD)
namedf
}
}
增加列
val data = initSparkAndData()
//方法一:可以添加常量值
data.select(when(col("name").isNotNull,1).otherwise(0) as "usergroup").show(100)
//方法二: 只能已经存在的列操作
data.withColumn("time", concat(col("age"),col("name")) ).show(100)
删除列
val data = initSparkAndData()
data.drop("vip").show(100)
二:SparkSql 窗口函数
传统数据库中就有这个函数,就是partation by () order by ()。那下面让我们看看sparksql中怎么写:
val data = initSparkAndData()
data.withColumn("isVsip", row_number().over(Window.partitionBy(col("vip")).orderBy(desc("name")))).show(100)
上面的意思是按照VIP分组,后按照name排序,作为新的列isVsip。项目中用来作为提取最新记录的函数,举例如下:
统计出用户最近登录记录:
val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD1 = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘time‘:‘2019-08-12‘}",
"{‘name‘:‘sunliu‘,‘time‘:‘2019-08-13‘}","{‘name‘:‘zhangsan‘,‘time‘:‘2019-08-14‘}"));
val namedf1 = sparkSession.read.json(nameRDD1)
val nameRDD2 = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘time‘:‘2019-09-12‘}",
"{‘name‘:‘sunliu‘,‘time‘:‘2019-08-13‘}","{‘name‘:‘zhangsan‘,‘time‘:‘2019-07-14‘}"));
val namedf2 = sparkSession.read.json(nameRDD2)
//上面全是构造数据。
namedf1.union(namedf2).withColumn("max_time", row_number().over(Window.partitionBy(col("name")).orderBy(desc("time"))))
.filter(col("max_time") ===1).show(100)
三:Sparksql join操作
初始化SparkContext及数据:
import java.util.Arrays
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
import java.util.ArrayList
object WordCount {
def joinTestData() = {
val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
val javasc = new JavaSparkContext(sparkSession.sparkContext)
val nameRDD = javasc.parallelize(Arrays.asList("{‘name‘:‘zhangsan‘,‘age‘:‘18‘,‘sex‘:‘N‘}", "{‘name‘:‘lisi‘,‘age‘:‘19‘,‘sex‘:‘F‘}","{‘‘:‘‘,‘‘:‘‘,‘‘:‘‘}"));
val nameRDD1 = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘age‘:‘18‘,‘vip‘:‘t‘}", "{‘name‘:‘sunliu‘,‘age‘:‘19‘,‘vip‘:‘t‘}","{‘name‘:‘zhangsan‘,‘age‘:‘18‘,‘vip‘:‘f‘}"));
val data1 = sparkSession.read.json(nameRDD)
val data2 = sparkSession.read.json(nameRDD1)
(data1,data2)
}
}
left、leftouter、left_outer三者相同
val dataTuple = joinTestData()
val data1 = dataTuple._1
val data2 = dataTuple._2
val left = data1.join(data2,data1("name") === data2("name") ,"left").show(100)
结果:
age | name | sex | age | name | vip |
null | null | null | null | null | null |
18 | zhangsan | N | 18 | zhangsan | f |
19 | lisi | f | null | null | null |
right、rightouter、right_outer三者相同
val dataTuple = joinTestData()
val data1 = dataTuple._1
val data2 = dataTuple._2
val right = data1.join(data2,data1("name") === data2("name") ,"right").show(100)
结果:
age | name | sex | age | name | vip |
null | null | null | 18 | wangwu | t |
18 | zhangsan | N | 18 | zhangsan | f |
null | null | null | sunliu | t |
cross、inner两者相同
val dataTuple = joinTestData()
val data1 = dataTuple._1
val data2 = dataTuple._2
val right = data1.join(data2,data1("name") === data2("name") ,"right").show(100)
结果:
age | name | sex | age | name | vip |
18 | zhangsan | N | 18 | zhangsan | f |
full、fullouter、full_outer、outer四者相同
val dataTuple = joinTestData()
val data1 = dataTuple._1
val data2 = dataTuple._2
val full = data1.join(data2,data1("name") === data2("name") ,"full").show(100)
结果:
age | name | sex | age | name | vip |
null | null | null | 18 | wangwu | t |
null | null | null | null | null | null |
18 | zhangsan | N | 18 | zhangsan | f |
null | null | null | 19 | sunliu | t |
19 | lisi | F | null | null | null |
leftsemi(innerjoin之后只保留左边的)
val dataTuple = joinTestData()
val data1 = dataTuple._1
val data2 = dataTuple._2
val leftsemi = data1.join(data2,data1("name") === data2("name") ,"leftsemi").show(100)
真正在项目中的使用:项目中有一张大表,主键是用户ID,里面有用户所有基本信息。项目使用过程中一般要求关联大表取得所有基本信息,leftsemi一般用于缩减大表。
结果:
age | name | sex |
18 | zhangsan | N |
leftanti(innerjoin之后去除能关联上之外的)
val dataTuple = joinTestData()
val data1 = dataTuple._1
val data2 = dataTuple._2
val leftouter = data1.join(data2,data1("name") === data2("name") ,"leftanti").show(100)
结果:
age | name | sex |
null | null | null |
19 | lisi | F |
以上是关于:SparkSQL基本操作的主要内容,如果未能解决你的问题,请参考以下文章