Apache Spark 3.1 中 Structured Streaming 方面的改进

Posted 过往记忆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Spark 3.1 中 Structured Streaming 方面的改进相关的知识,希望对你有一定的参考价值。

Apache Spark 3.1.x 版本发布到现在已经过了两个多月了,这个版本继续保持使得 Spark 更快,更容易和更智能的目标,Spark 3.1 的主要目标如下:

•提升了 Python 的可用性;•加强了 ANSI SQL 兼容性;•加强了查询优化;•Shuffle hash join 性能提升;•History Server 支持 structured streaming 
更多详情请参见这里。在这篇博文中,我们总结了3.1版本中 Spark Streaming 的显著改进,包括新的流式表(streaming table)API、支持 stream-stream join 和多个 UI 增强。此外,模式验证(schema validation)和对 Apache Kafka 数据源的改进提供了更好的可用性。此外,FileStream source/sink 也进行了各种增强,以提高读/写性能。

新的流式表 API

启动 structured stream 时,连续数据流被认为是无界表(unbounded table)。因此,Table APIs 提供了一种更自然、更方便的方法来处理流查询。在 Spark 3.1 中,社区添加了对 DataStreamReader 和 DataStreamWriter 的支持。我们现在可以直接以表的形式使用这个 API 读取和写入流式 DataFrames。请参见下面的示例:

# Create a streaming DataFrame
src = spark.readStream.format("rate").option("rowPerSecond", 10).load()
# Write the streaming DataFrame to a table
src.writeStream.option("checkpointLocation", checkpointLoc1).toTable("myTable")
# Check the table result
spark.read.table("myTable").show(truncate=30)
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2021-01-19 07:45:23.122|42   |
|2021-01-19 07:45:23.222|43   |
|2021-01-19 07:45:23.322|44   |
...

此外,通过这些新功能,用户可以转换源数据集并写入到一张新表:

# Write to a new table with transformation
spark.readStream.table("myTable").select("value") \\
  .writeStream.option("checkpointLocation", checkpointLoc2) \\
  .format("parquet").toTable("newTable")
# Check the table result
spark.read.table("newTable").show()
+-----+
|value|
+-----+
| 1214|
| 1215|
| 1216|
...

Databricks 推荐在 streaming table APIs 中使用 Delta Lake 格式,因为这种格式将带来以下好处:

•并发压缩由低延迟场景产生的小文件;•多个流作业(或并发批处理作业)支持“仅且一次”(exactly-once)处理;•当使用文件作为流的源时,可以有效地发现哪些文件是新的。

stream-stream 支持更多 Join 类型

在 Spark 3.1 之前,stream-stream join 只支持 inner、left outer 以及 right outer joins。在最新的版本中,社区实现了完整的 full outer 以及 left semi stream-stream join,使 Structured Streaming 支持更多的场景。

•Left semi stream-stream join (SPARK-32862)•Full outer stream-stream join (SPARK-32863)

Kafka 数据源性能提升

在 Spark 3.1 中,社区已经将 Kafka 依赖升级到 2.6.0 (SPARK-32568),这使得用户可以迁移到 Kafka offsets retrieval 新的 API(AdminClient.listOffsets)。它解决了使用旧版本时 Kafka 连接器无限等待的问题 (SPARK-28367)。

模式校验

模式是 Structured Streaming 查询的基本信息。在 Spark 3.1 中,社区为用户输入的模式和内部存储的模式添加了模式验证逻辑:

在查询重启中引入状态模式验证(SPARK-27237)

通过此更新,键和值的模式将存储在 stream 启动时的模式文件(schema files)中。然后,在重新启动查询时,根据现有的键和值模式验证新的键和值模式的兼容性。当字段的数量相同且每个字段的数据类型相同时,状态模式被认为是“兼容的”。注意,这里不会检查字段名,因为 Spark 允许重命名。

这将阻止使用不兼容状态模式的查询运行,从而减少不确定性行为的概率,并提供在错误的时候更多的信息。

为流状态存储引入模式验证(SPARK-31894)

以前,Structured Streaming 直接将检查点(用UnsafeRow表示)放到 StateStore 中,而不需要任何模式验证。当升级到新的 Spark 版本时,检查点文件将被重用。如果没有模式验证,任何与聚合函数相关的更改或 bug 修复都可能导致随机异常,甚至产生错误的结果(参见 SPARK-28067)。现在 Spark 将检验检查点里面的模式,并在迁移过程中重用检查点时抛出 InvalidUnsafeRowException。

Structured Streaming UI 方面的加强

社区在 Spark 3.0 中引入了新的 Structured Streaming UI。在 Spark 3.1 中,社区在 Structured Streaming UI 中添加了对历史记录服务器的支持(Structured Streaming UI(),以及更多关于 streaming 运行时状态的信息,具体如下:

Structured Streaming UI 中的状态信息 (SPARK-33223)

状态信息中添加了四个度量信息:

•Aggregated Number Of Total State Rows•Aggregated Number Of Updated State Rows•Aggregated State Memory Used In Bytes•Aggregated Number Of State Rows Dropped By Watermark

有了这些指标,我们就可以了解状态存储的整体情况。而且根据这些信息我们还可以评估是否需要扩容。

Structured Streaming UI 中 Watermark gap 信息 (SPARK-33224)

Watermark 是状态查询中用户需要跟踪的主要指标之一。它定义了附加模式(append mode)的输出“何时”发出,因此知道 wall clock 和水印(输入数据)之间的差距对于设置输出期望非常有帮助。

Structured Streaming UI 中自定义指标信息(SPARK-33287)

下面显示了在配置 spark.sql.streaming.ui.enabledCustomMetricList 中设置的自定义度量信息:

FileStreamSource/Sink 方面的加强

FileStreamSource/Sink 主要有以下几个方面的加强。

Cache fetched list of files beyond maxFilesPerTrigger as unread files (SPARK-30866)

以前,当设置了 maxFilesPerTrigger 配置时,FileStreamSource 将获取所有可用的文件,根据配置处理有限数量的文件,并在每个微批处理时忽略其他文件。通过这个改进,它将缓存以前批次中获取的文件,并在接下来的批次中处理它们。

Streamline the logic on file stream source and sink metadata log (SPARK-30462)

在此更改之前,每当需要 FileStreamSource/Sink 中的元数据时,元数据日志中的所有信息都被反序列化到 Spark 驱动程序的内存中。通过这个更改,Spark 将尽可能以流式(streamlined)的方式读取和处理元数据日志。

Provide a new option to have retention on output files (SPARK-27188)

FileStreamSink 中有一个新选项用于配置元数据日志文件的保留,这有助于限制长时间运行的 Structured Streaming 查询的元数据日志文件大小的增长。

未来的计划

在下一个主要版本中,社区将继续关注 Spark Structured Streaming 的新功能、性能和可用性改进。如果大家在使用过程中有任何方面的问题可以直接到社区反馈。

本文翻译自《What’s New in Apache Spark™ 3.1 Release for Structured Streaming》https://databricks.com/blog/2021/04/27/whats-new-in-apache-spark-3-1-release-for-structured-streaming.html

以上是关于Apache Spark 3.1 中 Structured Streaming 方面的改进的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark(Java)中列的自定义处理

spark3.3.1 for CDH6.3.2 打包

使用 Apache Spark SQL 将表序列化为嵌套 JSON

Apache Spark RDD(Resilient Distributed Datasets)论文

Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)

SparkCentOS7.5之Spark2.3.1HA安装