Structured Streaming总结
Posted Icedzzz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Structured Streaming总结相关的知识,希望对你有一定的参考价值。
参考:
Spark官方文档
博客https://blog.csdn.net/u013468917/article/details/79643850?spm=1001.2014.3001.5501
0. SparkSQL与DStream流
在DStream中,我们想要对数据操作进行DataFrame操作和sql语句,必须使用foreachRDD/transform类算子操作DStream中每个RDD,同时通过StreamingContext创建SQLContext,然后将每个RDD转换成DataFrame以临时表格配置并用 SQL 进行查询。例如下:
val spark = SparkSession.builder.config(conf).getOrCreate()
import spark.implicits._
count.foreachRDD(rdd =>{
val df: DataFrame = rdd.toDF("word", "count")
df.createOrReplaceTempView("words")
spark.sql("select * from words").show
})
1. Structured Streaming 概述
从 spark2.0 开始, spark 引入了一套新的流式计算模型: Structured Streaming。该组件进一步降低了处理数据的延迟时间, 它实现了“有且仅有一次(Exectly Once)” 语义, 可以保证数据被精准消费。Structured Streaming 基于 Spark SQl 引擎, 是一个具有弹性和容错的流式处理引擎. 使用 Structure Streaming 处理流式计算的方式和使用批处理计算静态数据(表中的数据)的方式是一样的。随着流数据的持续到达, Spark SQL 引擎持续不断的运行并得到最终的结果. 我们可以使用 Dataset/DataFrame API 来表达流的聚合, 事件-时间窗口(event-time windows), 流-批处理连接(stream-to-batch joins)等等. 这些计算都是运行在被优化过的 Spark SQL 引擎上. 最终, 通过 chekcpoin 和 WALs(Write-Ahead Logs), 系统保证end-to-end exactly-once。
总之, Structured Streaming 提供了快速, 弹性, 容错, end-to-end exactly-once 的流处理, 而用户不需要对流进行推理(比如 spark streaming 中的流的各种转换)。
默认情况下, 在内部, Structured Streaming 查询使用微批处理引擎(micro-batch processing engine)处理, 微批处理引擎把流数据当做一系列的小批job(small batch jobs ) 来处理. 所以, 延迟低至 100 毫秒, 从 Spark2.3, 引入了一个新的低延迟处理模型:Continuous Processing, 延迟低至 1 毫秒。
总结:
- 基于SparkSQL引擎,可以基于DS/DF API和处理静态数据一样
- Structured Streaming 基于事件时间,DStream基于处理时间
- 实现了“有且仅有一次(Exectly Once)” 语义
- Structured Streaming 查询使用微批处理引擎
2.Structured Streaming 编程模型
Structured Streaming 的核心思想是:把持续不断的流式数据看做一个无界表,并且查询体验和静态表一致。
1. 输入表
把输入数据流当做输入表(Input Table). 到达流中的每个数据项(data item)类似于被追加到输入表中的一行.
2. 结果表
作用在输入表上的查询将会产生“结果表(Result Table)”. 每个触发间隔(trigger interval, 例如 1s), 新行被追加到输入表, 最终会更新结果表. 无论何时更新结果表, 我们都希望将更改的结果行写入到外部接收器(external sink)
3. 输出模式
输出模式(outputMode)有 3 种:
-
Complete Mode 整个更新的结果表会被写入到外部存储. 存储连接器负责决定如何处理整个表的写出(类似于 spark streaming 中的有转态的转换).
complete模式下,会将整个更新的结果表写出到外部存储,即会将整张结果表写出,重点注意"更新"这个词,这就意味着,使用Complete模式是需要有聚合操作的,因为在结果表中保存非聚合的数据是没有意义的,所以,当在没有聚合的query中使用complete输出模式,就会报错。 -
Append Mode 从上次触发结束开始算起, 仅仅把那些新追加到结果表中的行写到外部存储(类似于无状态的转换). 该模式仅适用于不会更改结果表中行的那些查询.,(如果有聚合操作, 则必须添加 wartemark, 否则不支持此种模式,如果使用了 watermask 机制, 则只能使用基于 event-time 的聚合操作.) 因为该模式主要用于存储不会再发生变动的数据,只有过期的聚合结果才会在 Append 模式中被“有且仅有一次”的输出.
如果没有使用watermask:
-
Update Mode 从上次触发结束开始算起, 仅仅在结果表中更新的行会写入到外部存储. 此模式从 2.1.1 可用. 注意, Update Mode 与 Complete Mode 的不同在于 Update Mode 仅仅输出改变的那些行. 如果查询不包括聚合操作, 则等同于 Append Mode
注意 :Structured Streaming 不会实现整个表. 它从流式数据源读取最新的可用数据, 持续不断的处理这些数据, 然后更新结果, 并且会丢弃原始数据. 它仅保持最小的中间状态的数据, 以用于更新结果(例如前面例子中的中间counts)
在Append模式下,设置watermark的聚合操作。与update模式不同,append模式不会立即输出结果集,而是等到设置的watermark下,再没有数据更新的情况下再输出到结果集。
4. WordCount
// 1. 创建 SparkSession. 因为 Structured Streaming 是基于 spark sql 引擎, 所以需要先创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("WordCount1")
.getOrCreate()
import spark.implicits._
// 2. 从数据源(socket)中加载数据.
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "node01")
.option("port", 9999)
.load
// 3. 把每行数据切割成单词
val words: Dataset[String] = lines.as[String].flatMap(_.split("\\\\W"))
// 4. 计算 word count
val wordCounts: DataFrame = words.groupBy("value").count()
// 5. 启动查询, 把结果打印到控制台
val query: StreamingQuery = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start
query.awaitTermination()
3.Structured Streaming 应用特点
3.1 处理事件-时间和延迟数据
Structured streaming 与其他的流式引擎有很大的不同. 许多系统要求用户自己维护运行的聚合, 所以用户自己必须推理数据的一致性(at-least-once, or at-most-once, or exactly-once). 在Structured streaming模型中, 当有新数据的时候, spark 负责更新结果表, 从而减轻了用户的推理工作。
Event-time 是指嵌入到数据本身的时间, 或者指数据产生的时间. 对大多数应用程序来说, 我们想基于这个时间去操作数据. 例如, 如果我们获取 IoT(Internet of Things) 设备每分钟产生的事件数, 我们更愿意使用数据产生时的时间(event-time in the data), 而不是 spark 接收到这些数据时的时间.
在这个模型中, event-time 是非常自然的表达. 来自设备的每个时间都是表中的一行, event-time 是行中的一列. 允许基于窗口的聚合(例如, 每分钟的事件数)仅仅是 event-time 列上的特殊类型的分组(grouping)和聚合(aggregation): 每个时间窗口是一个组,并且每一行可以属于多个窗口/组。因此,可以在静态数据集和数据流上进行基于事件时间窗口( event-time-window-based)的聚合查询,从而使用户操作更加方便。
此外, 该模型也可以自然的处理晚于 event-time 的数据, 因为spark 一直在更新结果表, 所以它可以完全控制更新旧的聚合数据,或清除旧的聚合以限制中间状态数据的大小。
自 Spark 2.1 起,开始支持 watermark 来允许用于指定数据的超时时间(即接收时间比 event-time 晚多少),并允许引擎相应的清理旧状态。
3.1.1 基于Watermark处理延迟数据
在数据分析系统中, Structured Streaming 可以持续的按照 event-time 聚合数据, 然而在此过程中并不能保证数据按照时间的先后依次到达. 例如: 当前接收的某一条数据的 event-time 可能远远早于之前已经处理过的 event-time. 在发生这种情况时, 往往需要结合业务需求对延迟数据进行过滤.
现在考虑如果事件延迟到达会有哪些影响. 假如, 一个单词在 12:04(event-time) 产生, 在 12:11 到达应用. 应用应该使用 12:04 来在窗口(12:00 - 12:10)中更新计数, 而不是使用 12:11. 这些情况在我们基于窗口的聚合中是自然发生的, 因为结构化流可以长时间维持部分聚合的中间状态
但是, 如果这个查询运行数天, 系统很有必要限制内存中累积的中间状态的数量. 这意味着系统需要知道何时从内存状态中删除旧聚合, 因为应用不再接受该聚合的后期数据.
为了实现这个需求, 从 spark2.1, 引入了 watermark(水印), 使用引擎可以自动的跟踪当前的事件时间, 并据此尝试删除旧状态.
Spark通过指定 event-time 列和预估事件的延迟时间上限来定义一个查询的 watermark. 针对一个以时间 T 结束的窗口, 引擎会保留状态和允许延迟时间直到(max event time seen by the engine - late threshold > T). 换句话说, 延迟时间在上限内的被聚合, 延迟时间超出上限的开始被丢弃.
可以通过withWatermark() 来定义watermark
watermark 计算: watermark = MaxEventTime - Threshhod
而且, watermark只能逐渐增加, 不能减少
举例:
在 update 模式下, 使用Watermark,则以当前窗口的最大事件时间为标准,减去watermark为界限,在上限之内的数据被保留的状态聚合,超过上限则丢弃。
对于Append模式: Structured Streaming 无法确定后续批次的数据中是否会更新当前批次的内容. 因此, 基于 Append 模式的特点, 到达窗口结束时间时并不会第一时间输出任何数据(因为输出后数据就无法更改了), 直到某个窗口的结束时间小于 watermask, 即可以确定后续数据不会再变更该窗口的聚合结果时才会将其输出, 并移除内存中对应窗口的聚合状态.
**对于Complete模式:**在输出模式是complete的时候(必须有聚合), 要求每次输出所有的聚合结果. 我们使用 watermark 的目的是丢弃一些过时聚合数据, 所以complete模式使用wartermark无效也无意义
3.2 容错语义
提供端到端的exactly-once语义是 Structured Streaming 设计的主要目标. 为了达成这一目的, spark 设计了结构化流数据源, 接收器和执行引擎(Structured Streaming sources, the sinks and the execution engine)以可靠的跟踪处理的进度, 以便能够对任何失败能够重新启动或者重新处理.
每种流数据源假定都有 offsets(类似于 Kafka offsets) 用于追踪在流中的读取位置. 引擎使用 checkpoint 和 WALs 来记录在每个触发器中正在处理的数据的 offset 范围. 结合可重用的数据源(replayable source)和幂等接收器(idempotent sink), Structured Streaming 可以确保在任何失败的情况下端到端的 exactly-once 语义.
3.3 去重
Structured Streaming 去重的关键:为每条数据设计唯一ID。然后采用dropDuplicates去重。
- dropDuplicates 不可用在聚合之后, 即通过聚合得到的 df/ds 不能调用dropDuplicates
- 使用watermask - 如果重复记录的到达时间有上限,则可以在事件时间列上定义水印,并使用guid和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些记录不会再被重复。这限制了查询必须维护的状态量。
- 没有watermask - 由于重复记录可能到达时没有界限,查询将来自所有过去记录的数据存储为状态。
StructuredStreaming Trigger机制
与sparkstreaming基于定时器产生job然后调度的机制不同,structedstreaming实现了一套新的job触发机制(trigger)。
- 基于微批(Micro-Batch)的流处理
Structured Streaming默认使用微批处理执行模型。 这意味着Spark流式计算引擎会定期检查流数据源,并对自上一批次结束后到达的新数据执行批量查询。 在高层次上,它看起来像这样。
在这个体系结构中,Driver驱动程序通过将记录偏移量保存到预写日志中来对数据处理进度设置检查点,然后可以使用它来重新启动查询。 需要注意的是,为了获得确定性的重新执行(deterministic re-executions)和端到端语义,在处理下一个微批数据之前,要将该微批数据中的偏移范围保存到日志中。 所以,当前到达的数据需要等待当前的微批处理作业完成,且其中数据的偏移量范围被计入日志后,才能在下一个微批作业中得到处理。 在细粒度上,时间线看起来像这样。
- 连续处理(Continuous Processing)
在连续处理模式中,Spark不再是启动周期性任务,而是启动一系列连续读取,处理和写入数据的长时间运行的任务。 在高层次上,设置和记录级时间线看起来像这些(与上述微量批处理执行图相对照)。
由于事件在到达时会被立即处理和写入结果,所以端到端延迟只有几毫秒。
此外,我们利用著名的Chandy-Lamport算法对查询进度设置检查点。 特殊标记的记录被注入到每个任务的输入数据流中; 我们将它们称为“时间代标记(epoch marker)”,并将它们之间的差距称为“时间代(epoch)”。当任务遇到标记时,任务异步报告处理后的最后偏移量给driver。 一旦driver程序接收到写入接收器的所有任务的偏移量,它就会将它们写入前述的预写日志。 由于检查点的设置是完全异步的,任务可以不间断地持续并提供一致的毫秒级延迟。
以上是关于Structured Streaming总结的主要内容,如果未能解决你的问题,请参考以下文章
Structured Streaming 实战案例 读取Scoker
Structured Streaming 计算操作And输出操作
数据湖(十六):Structured Streaming实时写入Iceberg
如何使用 Spark Structured Streaming 打印 Json 编码的消息