Spark Structured Streaming - 由于增加输入源的数量,检查点中的 AssertionError

Posted

技术标签:

【中文标题】Spark Structured Streaming - 由于增加输入源的数量,检查点中的 AssertionError【英文标题】:Spark Structured Streaming - AssertionError in Checkpoint due to increasing the number of input sources 【发布时间】:2021-01-25 08:05:13 【问题描述】:

我正在尝试将两个流合并为一个并将结果写入主题

代码: 1-阅读两个主题

val PERSONINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xx:9092")
    .option("subscribe", "PERSONINFORMATION")
    .option("group.id", "info")
    .option("maxOffsetsPerTrigger", 1000)
    .option("startingOffsets", "earliest")
    .load()


val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xxx:9092")
    .option("subscribe", "CANDIDATEINFORMATION")
    .option("group.id", "candent")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1000)
    .option("failOnDataLoss", "false")
    .load()

2- 解析数据以加入它们:

val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")

   val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
      .select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")

   val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
   val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")

3- 加入两个框架

  val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")

  val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID", $"FULLNAME", $"PERSONALID")).cast("String").as("value"))

4- 将它们写入一个主题

  string2json.writeStream.format("kafka")
      .option("kafka.bootstrap.servers", xxxx:9092")
      .option("topic", "toDelete")
      .option("checkpointLocation", "checkpoints")
      .option("failOnDataLoss", "false")
      .start()
      .awaitTermination()

错误信息:

    21/01/25 11:01:41 ERROR streaming.MicroBatchExecution: Query [id = 9ce8bcf2-0299-42d5-9b5e-534af8d689e3, runId = 0c0919c6-f49e-48ae-a635-2e95e31fdd50] terminated with error
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
       

【问题讨论】:

【参考方案1】:

您的代码对我来说看起来不错,而是导致问题的检查点。

根据您收到的错误消息,您可能只使用一个流源来运行此作业。然后,您添加了流连接的代码并尝试在不删除现有检查点文件的情况下重新启动应用程序。现在,应用程序尝试从检查点文件中恢复,但意识到您最初只有一个来源,而现在您有两个来源。

Recovery Semantics after Changes in a Streaming Query 部分解释了使用检查点时允许和不允许哪些更改。不允许更改输入源的数量:

“输入源的数量或类型(即不同的源)的变化:这是不允许的。”

解决您的问题:删除当前检查点文件并重新开始作业。

【讨论】:

以上是关于Spark Structured Streaming - 由于增加输入源的数量,检查点中的 AssertionError的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming

Spark Structured Streaming

Spark Structured Streaming - 1

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录