如何将 Spark Streaming 检查点位置存储到 S3 中?

Posted

技术标签:

【中文标题】如何将 Spark Streaming 检查点位置存储到 S3 中?【英文标题】:How to store Spark Streaming Checkpoint Location into S3? 【发布时间】:2020-06-18 01:24:09 【问题描述】:

我对获取 S3 parquet 数据并将 parquet 数据写入 S3 的 Spark Streaming 应用程序 (Spark v2.3.2) 感兴趣。应用程序的数据帧流利用groupByKey()flatMapGroupsWithState() 来利用GroupState

是否可以将其配置为使用 s3 检查点位置?例如:

val stream = myDataset.writeStream
    .format("parquet")
    .option("path", s3DataDestination)
    .option("checkpointLocation", s3CheckpointPath)
    .option("truncate", false)
    .option(Trigger.Once)
    .outputMode(OutputMode.Append)
stream.start().awaitTermination()

我确认以上是能够成功写入数据到s3DataDestination的。

但是,写入 s3 检查点位置时会引发异常:

java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta

这是否需要自定义实现 S3 StateStoreProvider?或者,检查点位置是否需要写入HDFS?

【问题讨论】:

【参考方案1】:

问题是读写并发频率太高。 AWS S3 不提供此类功能。

解决方案:

我们必须切换到本地安装的永久性磁盘以进行 Spark 检查点 S3Guard:这将使 S3 的读写更加一致(注意:这是实验性的,我个人从未见过它实际使用过) 使用 HDFS

【讨论】:

以上是关于如何将 Spark Streaming 检查点位置存储到 S3 中?的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming 检查点到 amazon s3

大数据Spark Streaming Queries

spark-streaming scala:如何将字符串数组传递给过滤器?

如何在 Spark Streaming 中自动重启故障节点?

驱动程序重新启动后 Spark Streaming 检查点不起作用

Spark Structured Streaming - 由于增加输入源的数量,检查点中的 AssertionError