Note_Spark_Day12: StructuredStreaming入门
Posted 大数据Manor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Note_Spark_Day12: StructuredStreaming入门相关的知识,希望对你有一定的参考价值。
stypora-copy-images-to: img
typora-root-url: ./
Spark Day12:Structured Streaming
01-[了解]-上次课程内容回顾
主要讲解
SparkStreaming
如何企业开发:集成Kafka、三大应用场景(实时增量ETL、状态累加统计、窗口分析统计)。
1、集成Kafka
由于Kafka框架提供2套Consumer API,所以集成Kafka时,也提供2套API,但是推荐使用New Consumer API
- KafkaConsumer
- ConsumerRecord<Key, Value>,都是String类型
http://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
GAV:org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5
方法:
val kafkaDStream: DStream[String, String] = KafkaUtils.createDirectStream
直接从Kafka消费数据获取数据流中,每批次RDD是KafkaRDD
原理:
每批次BatchInterval时间间隔,依据偏移量范围到Kafka Topic中各个分区获取相应范围数据
从Kafka消费数据时,属性设置:
"enable.auto.commit" -> (false: java.lang.Boolean)
从Kafka消费数据时,不仅可以指定某个Topic获取或某些Topic,而且还有指定正则表达式,很方便消费多个Topic
SparkStreaming流式计算模块,在实际项目中有3大应用场景:主要如下所示
2、实时增量ETL,【实际项目中,此种应用类型最多】
实时将海量业务数据,进行实时ETL转换,存储到外部存储引擎,以便系统进行分析处理
业务数据一产生发送到 Kafka Topic -> 流式应用程序:ETL转换 -> HBase/ES
使用2个函数:
transform转换函数,针对每批次RDD进行转换处理,返回还是RDD
foreachRDD输出函数,针对每批次RDD进行输出,返回值为Unit
输出函数模式:
dstream.forearchRDD((rdd, batchTime) => {
// 每批次RDD针对每个分区数据进行操作,适当考虑是否降低分区数目
rdd.coalease(1).forearchPartition{iter =>
// 从连接池中获取连接
val conn: Connection
// 将每个分区数据进行保存,考虑批量保存
iter.foreach{item =>
}
// 将连接放回连接池中
conn.release()
}
})
3、状态累加统计
实时对数据进行聚合操作,并且状态属于累加统计的,比如双11大屏计算销售额
updateStateByKey 函数
依据Key更新状态的,需要定义状态更新函数,表示如何更新状态
updateFunc:
(values: Seq[V], state: Option[S]) => Option[S]
values: 表示当前批次中Key对应的所有Value的值
state:表示当前Key以前的状态,如果没有状态就是None
mapWithState 函数
依据Key更新状态,当Key存在时,才更新状态,否则不更新,性能远远由于updateStateByKey
StateSpec对象
StateSpec.function函数创建实例,传递map映射函数,针对每条数据进行状态更新处理
(KeyType, Option[ValueType], State[StateType]) => MappedType
Key类型 Value值数据类型 状态数据类型 返回数据类型
保存以前状态State,所以设置Checkpoint检查的目录,存储State数据
ssc.checkpoint("datas/streaming-ckpt-999999")
4、窗口分析统计
描述需求:每隔多长时间,统计最近多久范围内数据情况(趋势统计)
比如每隔1分钟统计最近20分钟内数据情况
窗口统计:
window size = 20 分钟
slide size = 1 分钟
分为2种类型窗口:
当window size = slide size : 滚动窗口,数据不会被重复处理
当window sieze > slide size : 滑动窗口,数据会被重复处理
函数:
window函数,设置窗口大小和滑动大小
将聚合函数和窗口函数合在一起:
reduceByKeyAndWindow
窗口大小和滑动大小,还需要设置聚合函数
02-[了解]-今日课程内容提纲
2个方面内容:偏移量管理(Checkpoint检查点)和
StructuredStreaming
入门(新的流式计算模块)
1、偏移量管理
SparkStreaming从Kafka消费数据时,如何管理偏移量,实现实时流式应用容灾恢复
方式一:
Checkpoint检查点恢复偏移量,继续消费数据
方式二:
用户手动管理偏移量,进行存储和读取,续集消费数据
推荐此种方式,相当来说比较麻烦,了解思路即可
【此部分内容,属于SparkStreaming模块处理流式数据一个不足之处,一大软点,使得用户不喜欢框架】
2、StructuredStreaming 快速入门
数据结构:DataFrame/Dataset,流式数据集
- 2.x提出结构化流模块处理流式数据
SparkStreaming不足之处
StructuredStreaming 设计原理和编程模型
- 入门案例:词频统计WordCount
实时累加统计
代码就是SparkSQL词频统计代码(DSL和SQL)
- 内置数据源,了解即可,几乎项目不用
- StructuredStreaming应用程序基本设置
03-[理解]-偏移量管理之引例和概述
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6wtQxLP6-1626354186973)(/img/image-20210506154426999.png)]
- 方式一:
Checkpoint
恢复
- 方式二:
手动管理偏移量和加载状态
程序中指定加载上一次状态信息,继续运行累加计算状态。
04-[理解]-偏移量管理之重构代码
实际项目开发中,为了代码重构复用和代码简洁性,将【从数据源读取数据、实时处理及结果输出】封装到方法【
processData
】中,类的结构如下:
Streaming流式应用模板完整代码:
package cn.itcast.spark.app.ckpt
import cn.itcast.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
* 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
*/
object _01StreamingTemplate {
/**
* 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
* @param ssc 流式上下文StreamingContext实例对象
*/
def processData(ssc: StreamingContext): Unit = {
// 1. 从Kafka消费数据,使用Kafka New Consumer API
val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
.consumerKafka(ssc, "search-log-topic")
// 2. 对每批次的数据进行搜索词进行次数统计
val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
val reduceRDD: RDD[(String, Int)] = rdd
// 过滤不合格的数据
.filter{ record =>
val message: String = record.value()
null != message && message.trim.split(",").length == 4
}
// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
.map{record =>
val keyword: String = record.value().trim.split(",").last
keyword -> 1
}
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
// 返回
reduceRDD
}
// 3、实时累加统计搜索词搜索次数,使用mapWithState函数
/*
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType]
*/
// 状态更新函数,针对每条数据进行更新状态
val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
// mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
(keyword: String, countOption: Option[Int], state: State[Int]) => {
// a. 获取当前批次中搜索词搜索次数
val currentState: Int = countOption.getOrElse(0)
// b. 从以前状态中获取搜索词搜索次数
val previousState = state.getOption().getOrElse(0)
// c. 搜索词总的搜索次数
val latestState = currentState + previousState
// d. 更行状态
state.update(latestState)
// e. 返回最新搜索次数
(keyword, latestState)
}
)
// 调用mapWithState函数进行实时累加状态统计
val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
// 5. 将结果数据输出 -> 将每批次的数据处理以后输出
stateDStream.print()
}
def main(args: Array[String]): Unit = {
// 1. 获取StreamingContext实例对象
/*
def getActiveOrCreate(
checkpointPath: String, // 检查点目录
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext
*/
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// TODO: 设置消费最大数量
/*
每秒钟消费每个分区数据最大量:1W
topic: 3个分区,batchInterval:5s
问:每批次数据消费最大量是多少?
1w * 3 * 5 = 15W
*/
.set("spark.streaming.kafka.maxRatePerPartition", "10000")
// b. 传递SparkConf和BatchInterval创建流式上下对象
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回实例对象
context
}
// TODO: 设置检查点目录
ssc.checkpoint("datas/streaming/state-8888")
// TODO:实时处理流式数据
processData(ssc)
// TODO: 启动流式应用,等待终止(人为或程序异常)
ssc.start()
ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
// 无论是否异常最终关闭流式应用(优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
扩展知识点:Scala语言中设计模式【
贷出模式
】
- 贷出函数:管理资源【获取资源和关闭资源】
- 用户函数:业务逻辑实现地方
- MAIN方法,调用贷出函数,将用户函数传递给贷出函数
05-[理解]-偏移量管理之Checkpoint编码实现
针对Spark Streaming状态应用程序,设置
Checkpoint
检查点目录,其中存储两种类型数据:
Metadata Checkpointing
用来恢复 Driver
;Data Checkpointing
用来容错stateful的数据处理失败
的场景 。
当我们再次运行Streaming Application时,只要从Checkpoint 检查点目录恢复,构建StreamingContext应用,就可以继续从上次消费偏移量消费数据。
使用
StreamingContext
中【getActiveOrCreate
】方法构建StreamingContext实例对象,方法声明如下:
若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。
修改上述案例代码:
package cn.itcast.spark.app.ckpt
import cn.itcast.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
* 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
*/
object _02StreamingStateCkpt {
/**
* 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
* @param ssc 流式上下文StreamingContext实例对象
*/
def processData(ssc: StreamingContext): Unit = {
// 1. 从Kafka消费数据,使用Kafka New Consumer API
val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
.consumerKafka(ssc, "search-log-topic")
// 2. 对每批次的数据进行搜索词进行次数统计
val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
val reduceRDD: RDD[(String, Int)] = rdd
// 过滤不合格的数据
.filter{ record =>
val message: String = record.value()
null != message && message.trim.split(",").length == 4
}
// 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
.map{record =>
val keyword: String = record.value().trim.split(",").last
keyword -> 1
}
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
// 返回
reduceRDD
}
// 3、实时累加统计搜索词搜索次数,使用mapWithState函数
/*
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType]
*/
// 状态更新函数,针对每条数据进行更新状态
val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
// mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
(keyword: String, countOption: Option[Int], state: State[Int]) => {
// a. 获取当前批次中搜索词搜索次数
val currentState: Int = countOption.getOrElse(0)
// b. 从以前状态中获取搜索词搜索次数
val previousState = state.getOption().getOrElse(0)
// c. 搜索词总的搜索次数
val latestState = currentState + previousState
// d. 更行状态
state.update(latestState)
// e. 返回最新搜索次数
(keyword, latestState)
}
)
// 表示,在启动应用时,可以初始化状态,比如从Redis中读取状态数据,转换为RDD,进行赋值初始化操作
/*
def initialState(rdd: RDD[(KeyType, StateType)]): this.type
*/
//spec.initialState()
// 调用mapWithState函数进行实时累加状态统计
val stateDStream: DStream[(String, Int)] = reduceDStream
.mapWithState(spec)
.filter(tuple => tuple._2 >= 10)
// 5. 将结果数据输出 -> 将每批次的数据处理以后输出
stateDStream.print()
}
def main(args: Array[String]): Unit = {
//TODO: 检查点目录
val CKPT_DIR: String = "datas/streaming/state-1000"
// 1. 获取StreamingContext实例对象
/*
def getActiveOrCreate(
checkpointPath: String, // 检查点目录
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext
*/
val ssc: StreamingContext = StreamingContext.getActiveOrCreate(
CKPT_DIR, // 如果目录存在,从Checkpoint数据恢复构建StreamingContext对象,包括DStream创建、转换和输出
// 匿名函数,函数参数没有,返回值要求:StreamingContext对象
() => { // CKPT不存在时,调用此函数构建StreamingContext对象,读取数据,转换和输出
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// 设置消费最大数量
.set("spark.streaming.kafka.maxRatePerPartition", "10000")
// b. 传递SparkConf和BatchInterval创建流式上下对象
val context = new StreamingContext(sparkConf, Seconds(5))
// c. TODO: 处理数据
processData(context)
// d. 返回流式上下文对象
context
}
)
// TODO: 设置检查点目录
ssc.checkpoint(CKPT_DIR)
// TODO: 启动流式应用,等待终止(人为或程序异常)
ssc.start()
ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
// 无论是否异常最终关闭流式应用(优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
当Streaming Application再次运行时,从Checkpoint检查点目录恢复时,有时有问题,比如修改程序,再次从运行时,可能出现类型转换异常,如下所示:
原因在于修改DStream转换操作,在检查点目录中存储的数据没有此类的相关代码,ClassCastException异常。
此时无法从检查点读取偏移量信息和转态信息,所以SparkStreaming中Checkpoint功能,属于鸡肋,食之无味,弃之可惜。
06-[理解]-偏移量管理之手动管理偏移量和状态思路
SparkStreaming中
Checkpoint
功能,属于鸡肋,食之无味,弃之可惜
。
- 解决问题一:状态State,针对实数累计统计来说,再次运行流式应用,获取上次状态
- 解决问题二:偏移量,从Kafka消费数据位置,再次运行应用时,继续上次消费位置消费数据
解决方案:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WBJj1lCY-1626354186980)(/img/image-20210506164820304.png)]
当运行流式应用程序时,首先从状态存储系统获取状态数据,进行状态初始化操作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GwzyGO7b-1626354186981)(/img/image-20210506164851608.png)]
保存每批次数据偏移量信息到存储系统中,比如mysql表、Zookeeper节点等,当再次运行流式应用时,从保存系统加载偏移量消息,继续消费数据。
07-[理解]-偏移量管理之MySQL存储偏移量
此处将偏移量数据存储到MySQL表中,数据库及表的DDL和DML语句如下:
-- 1. 创建数据库的语句
CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
USE db_spark ;
-- 2. 创建表的语句
CREATE TABLE `tb_offset` (
`topic` varchar(255) NOT NULL,
`partition` int NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint NOT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ;
-- 3. 插入数据语句replace
replace into tb_offset (`topic`, `partition`, `groupid`, `offset`) values(?, ?, ?, ?)
--/*
--
replace语句执行时,分以下两种情况:
--
- 情况1:insert,当不存或唯一索引冲突,相当于insert操作
--
- 情况2:delete and insert,当存在主键冲突或唯一索引冲突,相当于delete操作,加insert操作
--*/
-- 4. 查询数据语句select
select * from tb_offset where topic in ('xx', 'yy') AND groupid = 'gid001' ;
select * from tb_offset where topic in (?) and groupid = ? ;
工具类
OffsetsUtils
从MySQL数据库表中读取消费的偏移量信息和保存最近消费的偏移量值,示意图如下所示:
工 具 类 中 包 含 如 何 保 存 偏 移 量 【
saveOffsetsToTable
】 和 读 取 偏 移 量【getOffsetsToMap
】两个函数,具体代码如下:
package cn.itcast.spark.app.offset
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scala.collection.mutable
/**
* 将消费Kafka Topic偏移量数据存储MySQL数据库,工具类用于读取和保存偏移量数据
*/
object OffsetsUtils {
/**
* 依据Topic名称和消费组GroupId获取各个分区的偏移量
*
*@param topicNames Topics名称
*@param groupId 消费组ID
**/
def getOffsetsToMap(topicNames: Array[String], groupId: String): Map[TopicPartition, Long] ={
// 构建集合
val map: mutable.Map[TopicPartition, Long] = scala.collection.mutable.Map[TopicPartition, Long]()
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
var result: ResultSet = null
try{
// a. 加载驱动类
Class.forName("com.mysql.cj.jdbc.Driver")
// b. 获取连接
conn = DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
"root", //
"123456" //
)
// c. 编写SQL,获取PreparedStatement对象
// Array("1-topic", "2-topic") -> topic in ("1-topic", "2-topic")
val topicNamesStr = topicNames.map(topicName => s"\\'$topicName\\'").mkString(", ")
val querySQL =
s"""
|SELECT
| `topic`, `partition`, `groupid`, `offset`
|FROM
| db_spark.tb_offset
|WHERE
| groupid = ? AND topic in ($topicNamesStr)
|""".stripMargin
pstmt = conn.prepareStatement(querySQL)
pstmt.setString(1, groupId)
// d. 查询数据
result = pstmt.executeQuery()
// e. 遍历获取值
while (result.next()){
val topicName = result.getString("topic")
val partitionId = result.getInt("partition")
val offset = result.getLong("offset")
// 加入集合中
map += new TopicPartition(topicName, partitionId) -> offset
}
}catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != result) result.close()
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
}
// 返回集合,转换为不可变的
map.toMap
}
/**
* 保存Streaming每次消费Kafka数据后最新偏移量到MySQL表中
*
* @param offsetRanges Topic中各个分区消费偏移量范围
* @param groupId 消费组ID
*/
def saveOffsetsToTable(offsetRanges: Array[OffsetRange], groupId: String): Unit = {
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
try{
// a. 加载驱动类
Class.forName("com.mysql.cj.jdbc.Driver")
// b. 获取连接
conn = DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
"root", //
"123456" //
)
// c. 编写SQL,获取PreparedStatement对象
val insertSQL = "replace into db_spark.tb_offset (`topic`, `partition`, `groupid`, `offset`) values (?, ?, ?, ?)"
pstmt = conn.prepareStatement(insertSQL)
// d. 设置参数
offsetRanges.foreach{offsetRange =>
println(offsetRange.toString())
pstmt.setString(1, offsetRange.topic)
pstmt.setInt(2, offsetRange.partition)
pstmt.setString(3, groupId)
pstmt.setLong(4, offsetRange.untilOffset)
// 加入批次
pstmt.addBatch(以上是关于Note_Spark_Day12: StructuredStreaming入门的主要内容,如果未能解决你的问题,请参考以下文章
Note_Spark_Day13:Structured Streaming
Note_Spark_Day11:Spark Streaming