Note_Spark_Day09:离线综合实战
Posted ChinaManor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Note_Spark_Day09:离线综合实战相关的知识,希望对你有一定的参考价值。
Spark Day09:离线综合实战
01-[了解]-昨日课程内容回顾
上次课程:继续讲解SparkSQL模块内容【
Dataset
数据集、外部数据源(内置)、UDF函数定义、分布式SQL分析引擎(类似Hive)、Catalyst 优化器】
1、`Dataset`数据集
Spark 1.6提出,强类型(外部类型)和内部数据结构(Schema),特殊编码存储数据
Dataset = RDD + schema
Spark 2.0
数据结构:Dataset,DataFrame = Dataset[Row],Row表示一行数据,弱类型
Dataset创建方式
RDD创建:当RDD数据类型为CaseClass时,直接转换即可
DataFrame创建:指定CaseClass类型即可,dataframe.as[CaseClass]
RDD、DataFrame、Dataset 三者相互联系和转换
Dataset = RDD + Schema
=> DataFrame = RDD[Row] = Schema
DataFrame = Dataset[Row]
2、外部数据源(内置)
标准接口书写
- 加载数据load
spark.read.format().option().load()
- 保存数据save
dataframe.write.mode().format().option().save()
内置数据源
- parquet/orc 列式存储文件
- json 文本文件
format("json")
当做文本文件读取:textFile,使用函数:get_json_object获取JSON中某个字段的值
- text\\textFile
text:DataFrame
textFile:Dataset
字段名称:value,数据类型:String
- csv 文件加载
- 文件首行为列名称
sep、header、inferSchema
- 文件首行不是列名称
指定Schema信息
- jdbc,关系型数据RDBMS
以mysql数据库为例,加载数据
- 简洁方式
spark.read.jdbc()
- 通用方式
spark.read
.format("jdbc")
.option("", "")
.load
表的名称:dbTable,可以时子查询
(SELECT ... FROM ....) AS tmp
- hive表
SparkSQL与Hive集成,可以从Hive表中读取数据,进行分析,最后保存结果到Hive表中
启动Hive MetaStore服务
当运行Spark Application应用时,读取到配置信息,连接HiveMetaStore服务
- hbase数据库
实现SparkSQL提供外部数据源接口,可以方便从HBase表读写数据
spark.read.format("hbase").option("", "").load()
3、自定义UDF函数
定义2种方式
- 方式一、在SQL中使用
spark.udf.register(
"", // 函数名称
(param01, ...) => { ... } // 匿名函数
)
- 方式二、在DSL中使用
import org.apache.spark.sql.functions.udf
val udf_xx = udf(
(param01, ...) => { ... }
)
4、分布式SQL引擎,类似Hive框架
提供编写SQL地方,将SQL解析,转换为RDD操作,最后执行,处理数据。
- spark-sql命令行
类似Hive中bin/hive命令,基本使用很少
- Spark ThriftServer
将Spark 应用当做服务运行,通过JDBC/ODBC连接,编写SQL语句,发送服务,进行转换执行
- beeline 命令行
专门编写SQL语句
- JDBC代码
类似MYSQL数据库操作JDBC代码
5、Catalyst优化器
SparkSQL中核心,将SQL语句和DSL代码转换为逻辑计划(优化后逻辑计划)
包含三个部分:
未解析逻辑计划 -> 逻辑计划 -> 优化逻辑计划
02-[了解]-今日课程内容提纲
需求一: 数据ETL转换后,存储到Hive分区表中,查询如何结果
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qxgwdHoA-1620281896476)(/img/image-20210428083330170.png)]
需求二:简单报表分析,从Hive分区表加载昨日数据,进行统计分析,存储到MySQL结果表中
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GYxnQY0p-1620281896477)(/img/image-20210428083709213.png)]
需求三:业务报表分析,也是从Hive分区表加载昨日数据,进行统计分析,最终存储MySQL表
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oGU7vQTu-1620281896479)(/img/image-20210428083820226.png)]
附录一、创建Maven模块
1)、Maven 工程结构
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8WcftMpR-1620281896480)(/img/image-20210428070028257.png)]
2)、POM 文件内容
Maven 工程POM文件中内容(依赖包):
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
<mysql.version>8.0.19</mysql.version>
</properties>
<dependencies>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 与 Hive 集成 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- MySQL Client 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- 根据ip转换为省市区 -->
<dependency>
<groupId>org.lionsoul</groupId>
<artifactId>ip2region</artifactId>
<version>1.7.2</version>
</dependency>
<!-- 管理配置文件 -->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
以上是关于Note_Spark_Day09:离线综合实战的主要内容,如果未能解决你的问题,请参考以下文章
Note_Spark_Day11:Spark Streaming
Note_Spark_Day10:Spark Streaming
Note_Spark_Day14:Structured Streaming