Note_Spark_Day13:Structured Streaming
Posted ChinaManor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Note_Spark_Day13:Structured Streaming相关的知识,希望对你有一定的参考价值。
Spark Day13:Structured Streaming
01-[了解]-上次课程内容回顾
主要讲解2个方面内容:SparkStreaming中偏移量管理和StructuredStreaming快速入门
1、SparkStreaming中偏移量管理
- 统计类型应用,重启以后如何继续运行
状态State
继续消费Kafka数据(偏移量)
- Checkpoint 检查点
当流式应用再次重启运行时,从检查点目录构建应用程序(StreamingContext对象)
StreamingContext.getActiveOrCreate(ckptDir, () => StreamingContext)
- 手动管理偏移量
可以将流式应用每次消费Kafka数据,偏移量存储外部系统中,比如mysql数据库表、Zookeeper或HBase等
演示:将偏移量保存到MySQL表中
表的设计:
groupId、topic、partition、offset
编写工具类:
读取表中偏移量
保存每批次消费后偏移量
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4qt7a1eL-1620458282836)(/img/image-20210507150501579.png)]
2、StructuredStreaming
SparkStreaming 不足
。。。。。
StructuredStreaming 设计思想
。。。。。。
Spark2.0提供新型的流式计算框架,以结构化方式处理流式数据,将流式数据封装到Dataset/DataFrame中
思想:
将流式数据当做一个无界表,流式数据源源不断追加到表中,当表中有数据时,立即进行增量处理分析,最终按照设置输出模式,将结果数据输出
模型:
第一层、无界表,输入表:input table
第二层、增量查询,默认情况一有(1条数据或者多条数据)数据就查询
本质上还是微批处理
第三层、结果表:result table
增量查询时,会将结果表以前的数据进行合并:state状态更新
第四层、输出数据
按照OutputMode,将结果表的数据进行输出
- Append,默认值,追加数据
- Update,当结果表有数据更新再输出
- Complete,不管三七二十一,直接将结果表数据全部输出
入门案例
第一步、运行官方案例,从netcat消费数据,进行词频统计,打印控制台
第二步、编写程序,实现功能
SparkSession程序入口,加载流式数据spark.readStream,封装到流式数据集DataFrame
分析数据,直接使用DSL编程或者SQL编程
输出结果数据
val query: StreamingQuery = streamDF.writeStream.xxx.start() // 启动流式应用
query.awaitTermination()
query.stop()
02-[掌握]-词频统计WordCount(SQL编程)
修改词频统计WordCount代码,使用SQL分析处理,具体代码如下:
package cn.itcast.spark.start
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
* 第一点、程序入口SparkSession,加载流式数据:spark.readStream
* 第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式
* 第三点、启动流式应用,设置Output结果相关信息、start方法启动应用
*/
object _01StructuredWordCountSQL {
def main(args: Array[String]): Unit = {
// TODO: step1. 构建SparkSession实例对象,相关配置进行设置
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import spark.implicits._
// TODO: step2. 从TCP Socket加载数据,读取数据列名称为value,类型是String
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn")
.option("port", "9999")
.load()
/*
root
|-- value: string (nullable = true)
*/
//inputStreamDF.printSchema()
// TODO: step3. 进行词频统计,基于SQL分析
// 第一步、将DataFrame注册为临时视图
inputStreamDF.createOrReplaceTempView("view_temp_lines")
// 第二步、编写SQL语句并执行
val resultStreamDF: DataFrame = spark.sql(
"""
|WITH tmp AS (
| SELECT explode(split(trim(value), '\\\\s+')) AS word FROM view_temp_lines
|)
|SELECT word, COUNT(1) AS count FROM tmp GROUP BY word
|""".stripMargin)
/*
root
|-- word: string (nullable = true)
|-- count: long (nullable = false)
*/
// TODO: step4. 将结果输出(ResultTable结果输出,此时需要设置输出模式)
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Update()) // 表示当词频更新时,再输出
.format("console")
.option("numRows", "10")
.option("truncate", "false")
// 启动流式应用
.start()
// TODO: step5. 启动流式应用后,等待终止
query.awaitTermination()
query.stop()
}
}
03-[了解]-今日课程内容提纲
主要3个方面内容:内置数据源、自定义Sink(2种方式)和集成Kafka
1、内置数据源【了解】
File Source,监控某个目录下新的文件数据
Rate Source,产生随机数据数据源
2、StreamingQuery 流式查询器基本属性设置【理解】
名称
触发时间间隔
检查点
输出模式
如何保存流式应用End-To-End精确性一次语义
3、集成Kafka【掌握】
结构化流从Kafka消费数据,封装为DataFrame;将流式数据集DataFrame保存到Kafka Topic
- 数据源Source
- 数据终端Sink
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bSARTqmP-1620458282837)(/img/image-20210507153929371.png)]
04-[了解]-内置数据源之File Source 使用
从Spark 2.0至Spark 2.4版本,目前支持数据源有4种,其中Kafka 数据源使用作为广泛,其他数据源主要用于开发测试程序。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-boyzKF7d-1620458282838)(/img/image-20210507154039375.png)]
在Structured Streaming中使用
SparkSession#readStream
读取流式数据,返回DataStreamReader
对象,指定读取数据源相关信息,声明如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7GCVrmNQ-1620458282838)(/img/image-20210507154218794.png)]
查看DataStreamReader中方法可以发现与DataFrameReader中基本一致,编码上更加方便加载流式数据。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sehIuLZe-1620458282838)(/img/image-20210507154306506.png)]
文件数据源(File Source):将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GSxlX3l1-1620458282839)(/img/image-20210507154529822.png)]
可以设置相关可选参数:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vIimaBjt-1620458282839)(/img/image-20210507154553532.png)]
演示范例:监听某一个目录,读取csv格式数据,统计年龄小于25岁的人群的爱好排行榜。
package cn.itcast.spark.source
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
/**
* 使用Structured Streaming从目录中读取文件数据:统计年龄小于25岁的人群的爱好排行榜
*/
object _02StructuredFileSource {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,相关配置进行设置
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
// 设置Shuffle时分区数目
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import spark.implicits._
// TODO: 从文件数据源加载数据,本质就是监控目录
val schema: StructType = new StructType()
.add("name", StringType, nullable = true)
.add("age", IntegerType, nullable = true)
.add("hobby", StringType, nullable = true)
val inputStreamDF: DataFrame = spark.readStream
.schema(schema)
.option("sep", ";")
.csv("file:///D:/datas/")
// TODO: 监听某一个目录,读取csv格式数据,统计年龄小于25岁的人群的爱好排行榜。
val resultStreamDF: DataFrame = inputStreamDF
// 年龄小于25岁
.filter($"age" < 25)
// 按照爱好分组,统计个数
.groupBy($"hobby").count()
// 排行榜,依据个数降序排序
.orderBy($"count".desc)
// TODO: 将结果输出(ResultTable结果输出,此时需要设置输出模式)
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Complete()) // 当数据更新时再进行输出: mapWithState
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.start()
// 启动流式应用后,等待终止
query.awaitTermination()
query.stop()
}
}
05-[了解]-内置数据源之Rate Source 使用
以每秒指定的行数生成数据,
每个输出行包含2个字段:timestamp和value。其中timestamp是一个Timestamp含有信息分配的时间类型,并且value是Long(包含消息的计数从0开始作为第一
行)类型。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cN7ZOcXy-1620458282840)(/img/image-20210507155837251.png)]
演示范例代码如下:
package cn.itcast.spark.source
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 数据源:Rate Source,以每秒指定的行数生成数据,每个输出行包含一个timestamp和value。
*/
object _03StructuredRateSource {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,相关配置进行设置
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
// 设置Shuffle时分区数目
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import spark.implicits._
// TODO:从Rate数据源实时消费数据
val rateStreamDF: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "10")
.option("numPartitions", "2")
.load()
// TODO: 将结果输出(ResultTable结果输出,此时需要设置输出模式)
val query: StreamingQuery = rateStreamDF.writeStream
.outputMode(OutputMode.Append()) // 当数据更新时再进行输出: mapWithState
.format("console")
.option("numRows", "500")
.option("truncate", "false")
.start()
// 启动流式应用后,等待终止
query.awaitTermination()
query.stop()
}
}
06-[掌握]-基础特性之名称、触发、检查点及输出模式设置
在StructuredStreaming中定义好Result DataFrame/Dataset后,调用writeStream()返回DataStreamWriter对象,设置查询Query输出相关属性,启动流式应用运行,相关属性如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o71q3374-1620458282840)(/img/image-20210507160752844.png)]
第一、输出模式
"
Output
"是用来定义写入外部存储器的内容,输出可以被定义为不同模式:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lFcgGv1A-1620458282841)(/img/image-20210507161053539.png)]
第二、查询名称
可以给每个查询Query设置名称Name,必须是唯一的,直接调用
DataFrameWriter
中queryName
方法即可,实际生产开发建议设置名称,API说明如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rxoDUadK-1620458282842)(/img/image-20210507161211168.png)]
第三、触发间隔
触发器Trigger决定了多久执行一次查询并输出结果
,当不设置时,默认只要有新数据
,就立即执行查询Query,再进行输出。目前来说,支持三种触发间隔设置:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-caTZkGfH-1620458282842)(/img/image-20210507161259941.png)]
第四、检查点位置
在Structured Streaming中使用Checkpoint 检查点进行故障恢复。如果实时应用发生故障或关机,可以恢复之前的查询的进度和状态,并从停止的地方继续执行,使用Checkpoint和预写日志WAL完成。
此检查点位置必须是HDFS兼容文件系统中的路径,两种方式设置Checkpoint Location位置:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LhPoNF56-1620458282842)(/img/image-20210507161558806.png)]
修改上述词频统计案例程序,设置输出模式、查询名称、触发间隔及检查点位置,演示代码如下:
package cn.itcast.spark.output
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
* 设置输出模式、查询名称、触发间隔及检查点位置
*/
object _04StructuredQueryOutput {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,相关配置进行设置
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
// 设置Shuffle时分区数目
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import spark.implicits._
// 从TCP Socket加载数据,读取数据列名称为value,类型是String
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn")
.option("port", 9999)
.load()
// 进行词频统计
val resultStreamDF: DataFrame = inputStreamDF
.as[String] // 将DataFrame转换为Dataset
.filter(line => null != line && line.trim.length > 0 )
.flatMap(line => line.trim.split("\\\\s+"))
// 按照单词分组和聚合
.groupBy($"value").count()
resultStreamDF.printSchema()
// 将结果输出(ResultTable结果输出,此时需要设置输出模式)
val query: StreamingQuery = resultStreamDF.writeStream
// TODO: a. 设置输出模式, 当数据更新时再进行输出
.outputMode(OutputMode.Update())
// TODO: b. 设置查询名称
.queryName("query-wordcount")
// TODO: c. 设置触发时间间隔
.trigger(Trigger.ProcessingTime("0 seconds"))
.format("console")
.option("numRows", "10")
.option("truncate", "false")
// TODO: d. 设置检查点目录
.option("checkpointLocation", "datas/ss-ckpt/0001")
.start()
// 启动流式应用后,等待终止
query.awaitTermination()
query.stop()
}
}
07-[了解]-自定义Sink之综合概述
目前Structured Streaming内置FileSink、Console Sink、
Foreach Sink(ForeachBatch Sink)
、Memory Sink及Kafka Sink
,其中测试最为方便的是Console Sink
。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-migB7WXu-1620458282843)(/img/image-20210507164654578.png)]
其中最终重要三个Sink:
第一个、Console Sink
直接将流式数据集打印到控制台
测试开发使用
第二个、Foreach Sink / ForeachBatch Sink
提供自定义流式数据输出接口
Foreach Sink ,表示针对每条数据操作
ForeachBatch Sink,表示针对每个微批处理结果数据操作
第三个、Kafka Sink
将流式数据写入到Kafka Topic中
File Sink(文件接收器)
将输出存储到目录文件中,支持文件格式:parquet、orc、json、csv等,示例如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xteLKfap-1620458282844)(/img/image-20210507165003972.png)]
Memory Sink(内存接收器)
输出作为内存表存储在内存中, 支持Append和Complete输出模式。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wUYKqyaY-1620458282845)(/img/image-20210507165057431.png)]
08-[掌握]-自定义Sink之foreach使用
Structured Streaming提供接口
foreach
和foreachBatch
,允许用户在流式查询的输出上应用任意操作和编写逻辑,比如输出到MySQL表、Redis数据库等外部存系统。
foreach
允许每行自定义写入逻辑(每条数据进行写入)foreachBatch
允许在每个微批量的输出
上进行任意操作和自定义逻辑,从Spark 2.3版本提供
foreach表达自定义编写器逻辑具体来说,需要编写类class继承ForeachWriter,其中包含三个方法来表达数据写入逻辑:打开,处理和关闭。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sbyzo3hi-1620458282845)(/img/image-20210507165534532.png)]
演示案例:将前面词频统计结果输出到MySQL表【
db_spark.tb_word_count
】中。
package cn.itcast.spark.sink.foreach
import java.util.concurrent.TimeUnit
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果存储到MySQL数据库表中
*/
object _05StructuredMySQLSink {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象,相关配置进行设置
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
// 设置Shuffle时分区数目
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import spark.implicits._
// 从TCP Socket加载数据,读取数据列名称为value,类型是String
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn")
.option("port", 9999)
.load()
// 进行词频统计
val resultStreamDF: DataFrame = inputStreamDF
.as[String] // 将DataFrame转换为Dataset
.filter(line => null != line && line.trim.length > 0 )
.flatMap(line => line.trim.split("\\\\s+"))
// 按照单词分组和聚合
.groupBy($"value").count()
//resultStreamDF.printSchema()
// 将结果输出(ResultTable结果输出,此时需要设置输出模式)
val query: StreamingQuery = resultStreamDF.writeStream
// a. 设置输出模式, 当数据更新时再进行输出: mapWithState
.outputMode(OutputMode.Update())
// b. 设置查询名称
.queryName("query-wordcount")
// c. 设置触发时间间隔
.trigger(Trigger.ProcessingTime(0, TimeUnit.SECONDS))
// TODO: 使用foreach方法,自定义输出结果,写入MySQL表中
// def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
.foreach(new MySQLForeachWriter())
// d. 设置检查点目录
.option("checkpointLocation", "datas/spark/structured-ckpt-1002")
.start()
// 启动流式应用后,等待终止
query.awaitTermination()
query.stop()
}
}
其中自定义输出Writer:
MySQLForeachWriter
,代码如下;
package cn.itcast.spark.sink.foreach
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* 创建类继承ForeachWriter,将数据写入到MySQL表中,泛型为:Row,针对DataFrame操作,每条数据类型就是Row
*/
class MySQLForeachWriter extends ForeachWriter[Row] {
// 定义变量
var conn: Connection = _
var pstmt: PreparedStatement = _
val jdbcUrl: String = "jdbc:mysql://node1.itcast.cn:3306/db_spark?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"
val insertSQL = "REPLACE INTO `tb_word_count` (`id`, `word`, `count`) VALUES (NULL, ?, ?)"
// 获取MySQL数据连接, 如果获取连接成功,返回true,进行向下执行
override def open(partitionId: Long, epochId: Long): Boolean = {
// step1. 加载驱动类
Class.forName("com.mysql.cj.jdbc.Driver")
// step2. 获取连接
conn = DriverManager.getConnection(
jdbcUrl, "root", "123456"
)
// step3. 构建PreparedStatement对象
pstmt = conn.prepareStatement(insertSQL)
// TODO: 返回true,表示连接获取成功
true
}
// 如何将每条数据写入到MySQL表中
override def process(row: Row): Unit = {
// step4. 设置每条数据值得值到Statement对象中
pstmt.setString(1, row.getString(0))
pstmt.setInt(2, row.getInt(1))
// step5. 执行插入
pstmt.executeUpdate()
}
// 写入结束,关闭数据库连接
override def close(errorOrNull: Throwable): Unit = {
// step6. 关闭连接
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
}
}
09-[掌握]-自定义Sink之foreachBatch使用
方法
foreachBatch
允许指定在流式查询的每个微批次的输出数据上执行的函数,需要两个参数:微批次的输出数据DataFrame或Dataset、微批次的唯一ID。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uxpjk6Rv-1620458282845)(/img/image-20210507171309782.png)]
使用foreachBatch
函数输出时,以下几个注意事项:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DPFiEoTQ-1620458282846)(/img/image-20210507171509412.png)]
范例演示:使用foreachBatch将词频统计结果输出到MySQL表中,代码如下:
package cn.itcast.spark.sink.batch
import java.util.concurrent.TimeUnit
import org.apacheNote_Spark_Day12: StructuredStreaming入门
Note_Spark_Day11:Spark Streaming
Note_Spark_Day12:Structured Streaming