PySpark 和 Kafka “Set 已消失。某些数据可能已丢失……”

Posted

技术标签:

【中文标题】PySpark 和 Kafka “Set 已消失。某些数据可能已丢失……”【英文标题】:PySpark and Kafka "Set are gone. Some data may have been missed.." 【发布时间】:2021-03-03 11:04:24 【问题描述】:

我正在本地模式下使用 Spark 集群运行 PySpark,并且我正在尝试将流式 DataFrame 写入 Kafka 主题。

当我运行查询时,我收到以下消息:

java.lang.IllegalStateException: Set(topicname-0) are gone. Some data may have been missed.. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".

这是我的代码:

query = (
    output_stream
    .writeStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "ratings-cleaned")
    .option("checkpointLocation", "checkpoints-folder")
    .start()
)
sleep(2)
print(query.status)

【问题讨论】:

【参考方案1】:

此错误消息提示检查点存在问题。在开发过程中,这可能是由于使用带有更新查询的旧检查点文件夹造成的。

如果这是在开发环境中并且您不需要保存上一个查询的状态,您可以删除检查点文件夹(代码示例中的checkpoints-folder)并重新运行您的查询。

【讨论】:

【参考方案2】:

自上次运行查询以来从源主题中删除了一些消息/偏移量时,通常会显示此错误消息。删除是由于清理政策(例如保留时间)而发生的。

假设您的主题包含偏移量为 0、1、2 的消息,这些消息均已由应用程序处理。检查点文件存储最后一个偏移量 2 以记住在下次启动时继续使用偏移量 3。

一段时间后,偏移量为 3、4、5 的消息被生成到主题,但偏移量为 0、1、2、3 的消息由于保留而从主题中删除。

现在,当重新启动 spark 结构化流作业时,它会尝试根据其检查点文件获取 3,但意识到只有偏移量为 4 的消息可用。在这种情况下,它会抛出这个异常。

你可以解决这个问题

readStream 操作中设置.option("failOnDataLoss", "false"),或者 删除现有的检查点文件

根据Structured Streaming + Kafka Integration Guide 选项failOnDataLoss 描述为:

"当数据可能丢失(例如,主题被删除或偏移量超出范围)时是否使查询失败。这可能是一个误报。当它不能正常工作时,您可以禁用它预期。如果由于丢失数据而无法从提供的偏移量中读取任何数据,则批处理查询将始终失败。"

【讨论】:

谢谢!我的 readStream 代码没有指定检查点文件夹。这是否意味着整个查询使用相同的检查点位置?我有点困惑,因为并非所有输出流格式都需要指定检查点位置,所以我认为它是特定于输出流的。 readStream 是一个惰性操作,只有在使用查询的 writeStrem(连同start)调用时才会执行。只有在 writeStream 中定义检查点位置就足够了。请记住,如果这些写入查询都使用不同的检查点位置,您甚至可以拥有多个 writeStreams 都基于单个 readStream。【参考方案3】:

除了上述答案之外,Bartosz Konieczny 还发布了a more detailed reason。错误消息的第一部分说Set() 是空的;那是一组主题分区(因此最后是-0)。这意味着 Spark 集群订阅的分区已被删除。我的猜测是 Kafka 设置已重新启动。 Spark 查询正在使用一些默认检查点文件夹,假设 Kafka 设置未重新启动。

【讨论】:

以上是关于PySpark 和 Kafka “Set 已消失。某些数据可能已丢失……”的主要内容,如果未能解决你的问题,请参考以下文章

将 Pyspark 与 Kafka 连接起来

从 Docker 容器将 PySpark 连接到 Kafka

Kafka和Pyspark整合

我可以使用 spark 2.3.0 和 pyspark 从 Kafka 进行流处理吗?

pyspark.sql.utils.AnalysisException:找不到数据源:kafka

pySpark Kafka Direct Streaming 更新 Zookeeper / Kafka Offset