Spark Structured Streaming - 1
Posted 959_1x
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Structured Streaming - 1相关的知识,希望对你有一定的参考价值。
文章目录
1.Spark Streaming和Structured Streaming
1.1 Spark Streaming时代
是RDD的API的流式工具, 其本质还是RDD, 存储和执行过程依然类似RDD
1.2 Structured Streaming时代
是Dataset的API的流式工具,API和Dataset保持高度一致
1.3 Spark Streaming和Structured Streaming
- 进步就类似于Dataset相比于RDD的进步
- Structured Streaming已经支持了连续流模型, 也就是类似于Flink那样的实时流, 而不是小批量, 但在使用的时候仍然有限制, 大部分情况还是应该采用小批量模式
- 在2.2.0以后Structured Streaming被标注为稳定版本, 意味着以后的Spark流式开发不应该在采用Spark Streaming了
Spark Streaming:
- 基于微批,延迟高不能做到真正的实时
- DStream基于RDD,不直接支持SQL
- 流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)
- 不支持EventTime事件时间
注:
EventTime事件时间 :事件真正发生的事件
PorcessingTime处理时间:事件被流系统处理的时间
IngestionTime摄入时间:事件到底流系统的时间
如: 一条错误日志10月1日,23:59:00秒产生的(事件时间),因为网路延迟,到10月2日 00:00:10到达日志处理系统(摄入时间),10月2日 00:00:20被流系统处理(处理时间)
如果要统计10月1日的系统bug数量,那么SparkStreaming不能正确统计,因为它不支持事件时间 - 数据的Exactly-Once(恰好一次语义)需要手动实现
注: 数据的一致性语义
最多一次
恰好一次–是我们的目标,SparkStreaming如果要实现恰好一次,需要手动维护偏移量+其他操作
最少一次
Structured Streaming:
- 编程模型: 动态表格/无界表
- 数据抽象: DataFrame/DataSet
- 与Spark Sql 无缝连接
2.word Count
object SocketWordCount
def main(args: Array[String]): Unit =
// 1. 创建SparkSession
val spark = SparkSession.builder()
.master("local[6]")
.appName("socket word count")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
// 2. 读取外部数据源,并转为Dataset[String]
val source: Dataset[String] = spark.readStream
.format("socket")
.option("host", "192.168.88.100")
.option("port", 7777)
.load()
.as[String]
// 3. 统计词频
val words = source.flatMap(_.split(" "))
.map((_, 1))
.groupByKey(_._1)
.count()
// 4. 输出结果
words.writeStream
.outputMode(OutputMode.Complete()) // 统计全局结果,而不是一个批次
.format("console")
.start()
.awaitTermination()
- Structured Streaming 依然是小批量的流处理
- Structured Streaming 的输出是类似 DataFrame 的, 也具有 Schema, 所以也是针对结构化数据进行优化的
- 从输出的时间特点上来看, 是一个批次先开始, 然后收集数据, 再进行展示, 这一点和 Spark Streaming 不太一样
3.Structured Streaming的体系和结构
3.1 无限扩展的表格
Spark中的DS有两种:
- 处理静态批量数据的DS:使用read和write进行读写
- 处理动态实时流的DS:使用readStream和writeStream进行读写
如何使用DS表示流式计算
- 可以把流式的数据想象成一个不断增长,无限无界的表
- 无论是否有界,全部都使用DS这一套的API,所以可以保障流和批的处理使用完全相同的代码
3.2 体系结构
StreamExecution
StreamExecution在流上进行基于Dataset的查询, 也就是说,Dataset之所以能够在流上进行查询, 是因为StreamExecution的调度和管理。
增量查询:
- Structured Streaming虽然从API角度上模拟出来的是一个无限扩展的表,但不能无限存储,并且历史数据中有很多是冗余的,所以要做增量存储。
- 所以这里设置了一个全局范围的高可用StateStore,这个时候针对增量的查询变为如下步骤:
1)从StateStore中取出上次执行完成后的状态
2)把上次执行的结果加入本批次,再进行计算,得出全局结果
3)将当前批次的结果放入StateStore中,留待下次使用
以上是关于Spark Structured Streaming - 1的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming - 1
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?