Spark Structured Streaming - 此查询不支持从检查点位置恢复

Posted

技术标签:

【中文标题】Spark Structured Streaming - 此查询不支持从检查点位置恢复【英文标题】:Spark Structured Streaming - This query does not support recovering from checkpoint location 【发布时间】:2021-01-25 06:56:43 【问题描述】:

出于学习目的,我正在尝试对检查点进行一些实验/测试。

但我获得的选项有限,无法查看内部组件的工作情况。我正在尝试从套接字读取。

val lines: DataFrame = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 12345)
  .load()

并对其进行一些我需要检查点的状态操作

第一季度。使用检查点位置作为我的本地系统,它无法读取检查点并给出错误

This query does not support recovering from checkpoint location. Delete src/testC/offsets to start over.;

它每次都会创建一个运行查询的新检查点。如何使用我的本地系统作为检查点以进行测试/实验?

(所以我选择了hdfs)

第二季度。当 hdfs 作为检查点时,它在我的本地系统而不是 hdfs 中创建检查点,如何使其成为 hdfs 的检查点? (顺便说一句,通过 hdfs 配置)

df.writeStream
  .option("checkpointLocation","/mycheckLoc")
  .option("hdfs_url" -> "hdfs://localhost:9000/hdoop"),
  .option("web_hdfs_url" -> "webhdfs://localhost:9870/hdoop")

第三季度。我们是否需要在每个df.writeStream 选项中提供检查点,即我们也可以传入spark.sparkContext.setCheckpointDir(checkpointLocation) 对吗?

【问题讨论】:

【参考方案1】:

您收到此错误“此查询不支持从检查点位置恢复”,因为socket readStream 不是可重播源,因此不允许使用任何检查点。您需要确保在您的 writeStream 中根本不使用选项 checkpointLocation

通常,您可以使用file:///path/to/dirhdfs:///path/to/dir 来区分本地文件系统和hdfs 位置。

确保您的应用程序用户拥有写入和读取这些位置的所有权限。此外,您可能更改了代码库,在这种情况下应用程序无法从检查点文件中恢复。您可以在Recovery Semantics after Changes in a Streaming Query 上的结构化流编程指南中了解结构化流作业中允许不允许 的更改。

为了让 Spark 了解您的 HDFS,您需要在 Spark 的类路径中包含两个 Hadoop 配置文件:

hdfs-site.xml 为 HDFS 客户端提供默认行为;和 core-site.xml 设置默认文件系统名称。

通常,它们存储在“/etc/hadoop/conf”中。要使这些文件对 Spark 可见,您需要将 $SPARK_HOME/spark-env.sh 中的 HADOOP_CONF_DIR 设置为包含配置文件的位置。

[出自《Spark——权威指南》一书]

“我们是否需要在每个df.writeStream 选项中提供检查点,即我们也可以传入spark.sparkContext.setCheckpointDir(checkpointLocation) 对吗?”

理论上,您可以为 SQLContext 中的所有查询集中设置检查点位置,但强烈建议为每个流设置唯一的检查点位置。 Structured Streaming in Production 上的 Databricks 博客说:

“此检查点位置保留了唯一标识查询的所有基本信息。因此,每个查询必须具有不同的检查点位置,并且多个查询不应具有相同的位置。

“作为最佳实践,我们建议您始终指定 checkpointLocation 选项。”

【讨论】:

嗨@mike,感谢您的回答,但即使使用本地文件系统或hdfs,一个问题仍然存在。它给出了一个错误Exception in thread "main" org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete hdfs://127.0.0.1:9000/c1/offsets to start over.; 。我在这里缺少什么..? 抱歉,忘记我之前的评论了。您只能将检查点用于“可重复播放”的来源。 “socket”源是不可重播的,所以你根本不能在这里使用 checkpointintg。 是的,你是对的..socket 将无法工作。我认为kafka会很好 是的,对于 Kafka 来说,它会工作得非常好!只要确保每个输出流都有一个检查点位置,以防您在同一个应用程序中有多个查询。 嗨@mike,你能不能也偷偷溜进去一下:***.com/questions/65903327/…

以上是关于Spark Structured Streaming - 此查询不支持从检查点位置恢复的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming

Spark Structured Streaming

Spark Structured Streaming - 1

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录