Spark Structured Streaming - 1

Posted 959_1x

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Structured Streaming - 1相关的知识,希望对你有一定的参考价值。


文章目录

1.Spark Streaming和Structured Streaming

1.1 Spark Streaming时代

是RDD的API的流式工具, 其本质还是RDD, 存储和执行过程依然类似RDD

Spark

1.2 Structured Streaming时代

是Dataset的API的流式工具,API和Dataset保持高度一致

Spark

Spark

1.3 Spark Streaming和Structured Streaming

  1. 进步就类似于Dataset相比于RDD的进步
  2. Structured Streaming已经支持了连续流模型, 也就是类似于Flink那样的实时流, 而不是小批量, 但在使用的时候仍然有限制, 大部分情况还是应该采用小批量模式
  3. 在2.2.0以后Structured Streaming被标注为稳定版本, 意味着以后的Spark流式开发不应该在采用Spark Streaming了

Spark Streaming:

  1. 基于微批,延迟高不能做到真正的实时
  2. DStream基于RDD,不直接支持SQL
  3. 流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)
  4. 不支持EventTime事件时间
    注:
    EventTime事件时间 :事件真正发生的事件
    PorcessingTime处理时间:事件被流系统处理的时间
    IngestionTime摄入时间:事件到底流系统的时间
    如: 一条错误日志10月1日,23:59:00秒产生的(事件时间),因为网路延迟,到10月2日 00:00:10到达日志处理系统(摄入时间),10月2日 00:00:20被流系统处理(处理时间)
    如果要统计10月1日的系统bug数量,那么SparkStreaming不能正确统计,因为它不支持事件时间
  5. 数据的Exactly-Once(恰好一次语义)需要手动实现
    注: 数据的一致性语义
    最多一次
    恰好一次–是我们的目标,SparkStreaming如果要实现恰好一次,需要手动维护偏移量+其他操作
    最少一次

Structured Streaming:

  1. 编程模型: 动态表格/无界表
  2. 数据抽象: DataFrame/DataSet
  3. 与Spark Sql 无缝连接

2.word Count

object SocketWordCount 
def main(args: Array[String]): Unit =
// 1. 创建SparkSession
val spark = SparkSession.builder()
.master("local[6]")
.appName("socket word count")
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

// 2. 读取外部数据源,并转为Dataset[String]
val source: Dataset[String] = spark.readStream
.format("socket")
.option("host", "192.168.88.100")
.option("port", 7777)
.load()
.as[String]

// 3. 统计词频
val words = source.flatMap(_.split(" "))
.map((_, 1))
.groupByKey(_._1)
.count()

// 4. 输出结果
words.writeStream
.outputMode(OutputMode.Complete()) // 统计全局结果,而不是一个批次
.format("console")
.start()
.awaitTermination()

Spark


Spark

  • Structured Streaming 依然是小批量的流处理
  • Structured Streaming 的输出是类似 DataFrame 的, 也具有 Schema, 所以也是针对结构化数据进行优化的
  • 从输出的时间特点上来看, 是一个批次先开始, 然后收集数据, 再进行展示, 这一点和 Spark Streaming 不太一样

3.Structured Streaming的体系和结构

3.1 无限扩展的表格

Spark中的DS有两种:

  1. 处理静态批量数据的DS:使用read和write进行读写
  2. 处理动态实时流的DS:使用readStream和writeStream进行读写

如何使用DS表示流式计算

  1. 可以把流式的数据想象成一个不断增长,无限无界的表
  2. 无论是否有界,全部都使用DS这一套的API,所以可以保障流和批的处理使用完全相同的代码

Spark

Spark

3.2 体系结构

StreamExecution

StreamExecution在流上进行基于Dataset的查询, 也就是说,Dataset之所以能够在流上进行查询, 是因为StreamExecution的调度和管理。

Spark


增量查询:

Spark

  1. Structured Streaming虽然从API角度上模拟出来的是一个无限扩展的表,但不能无限存储,并且历史数据中有很多是冗余的,所以要做增量存储。
  2. 所以这里设置了一个全局范围的高可用StateStore,这个时候针对增量的查询变为如下步骤:
    1)从StateStore中取出上次执行完成后的状态
    2)把上次执行的结果加入本批次,再进行计算,得出全局结果
    3)将当前批次的结果放入StateStore中,留待下次使用


以上是关于Spark Structured Streaming - 1的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming

Spark Structured Streaming

Spark Structured Streaming - 1

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录