如何对 Spark Streaming 生成的分区 parquet 文件进行适当的内务管理

Posted

技术标签:

【中文标题】如何对 Spark Streaming 生成的分区 parquet 文件进行适当的内务管理【英文标题】:How to do proper housekeeping of partitioned parquet files generated from Spark Streaming 【发布时间】:2019-04-11 17:01:53 【问题描述】:

我的 Spark 结构化流式传输作业不断生成我想在过期后删除的 parquet 文件(比如说 30 天后)。

我存储我的 parquet 数据,分区键是 RFC3339/ISO8601 中的事件日期,以便基于 cron 作业在 HDFS 级别上相当容易地完成内务管理(删除所有具有 partitionkey

但是,自从我介绍了 Spark Streaming,Spark 将元数据写入到要写入的数据本身旁边名为 _spark_metadata 的文件夹中。如果我现在只是删除过期的 HDFS 文件并在整个数据集上运行 spark 批处理作业,作业将由于找不到文件而失败。批处理作业将读取元数据并期望已删除的文件存在。

对此的简单解决方案是禁用 _spark_metadata 目录的创建,如下所述:disabling _spark_metadata in Structured streaming in spark 2.3.0。但由于我不想在读取数据以进行常规批处理分析时失去性能,我想知道是否没有更好的解决方案。

我想,我可以只使用 spark 进行删除,以便它删除 parquet hdfs 文件 并且 更新元数据。但是,只需执行一个

session.sql(String.format("DELETE FROM parquet.`%s` WHERE partitionKey < " + oldestAllowedPartitionAge, path.toString()));

不起作用。 DELETE 很遗憾是 Spark 中不支持的操作...

有什么解决方案可以让我删除旧数据但_spark_metadata 文件夹仍然可以工作吗?

【问题讨论】:

【参考方案1】:

据我了解,_spark_metadata 的主要目的是确保容错性并避免列出所有要处理的文件:

为了正确处理部分故障,同时保持 恰好一次语义,每个批次的文件被写出到一个 唯一目录,然后自动附加到元数据日志。什么时候 一个基于镶木地板的DataSource 被初始化以供阅读,我们首先 检查此日志目录并在以下情况下使用它而不是文件列表 现在。

https://github.com/apache/spark/commit/6bc4be64f86afcb38e4444c80c9400b7b6b745de

您引用的链接 (disabling _spark_metadata in Structured streaming in spark 2.3.0) 解释说问题来自不一致的检查点状态 - 检查点生成元数据,但后来用户手动删除了它,当他重新启动查询时,它失败了,因为检查点应该有元数据文件。

要查看缺少元数据是否会导致批处理失败,请查看 org.apache.spark.sql.execution.datasources.DataSource#resolveRelation 方法,您可以在其中找到 2 种情况下的模式匹配:

  // We are reading from the results of a streaming query. Load files from the metadata log
  // instead of listing them using HDFS APIs.
  case (format: FileFormat, _)
      if FileStreamSink.hasMetadata(
        caseInsensitiveOptions.get("path").toSeq ++ paths,
        sparkSession.sessionState.newHadoopConf()) =>
  case (format: FileFormat, _) =>
    val globbedPaths =
      checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)

hasMetadata 方法看起来像:

  def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = 
    path match 
      case Seq(singlePath) =>
        try 
          val hdfsPath = new Path(singlePath)
          val fs = hdfsPath.getFileSystem(hadoopConf)
          if (fs.isDirectory(hdfsPath)) 
            fs.exists(new Path(hdfsPath, metadataDir))
           else 
            false
          
         catch 
          case NonFatal(e) =>
            logWarning(s"Error while looking for metadata directory.")
            false
        
      case _ => false
    
  

如您所见,没有失败的风险(至少通过阅读代码!)。如果您有一些,请提供更多上下文,因为问题可能出在其他地方。

关于你的性能问题,这个_spark_metadata 只包含文件列表,所以当然,Spark 首先需要列出你输入目录中的文件。但根据我的经验,这不是最昂贵的操作。例如,在 AWS S3 上列出包含 1297 个文件的目录大约需要 9 秒。之后,由您决定是想要一个简单的清洁过程还是稍微慢一些的批处理。如果您有更多这样的文件,也许您还应该将它们分组为更大的文件,例如 256 MB 或更大?

不过,如果您想保留_spark_metadata,也许有一种方法可以通过您的清理应用程序删除文件。但这将具有挑战性,因为您将有 2 个应用程序(流式传输和清理)处理相同的数据。

您可以在此处找到有关_spark_metadata 的更多信息:How to change the location of _spark_metadata directory?

【讨论】:

感谢您提供这些信息。删除 _spark_metadata 后,我的批处理再次运行,所以我很确定它们是这里的问题。但我不知道它们是 Exactly-Once-Semantics 所必需的,因此禁用它们可能不是一个好主意:) 所以现在我转而在批处理作业中使用另一种镶木地板加载机制 (***.com/questions/53479585/…) 以便 _spark_metadata保留,但批处理作业继续工作。【参考方案2】:

这实际上是结构化流 (SPARK-24295) 中的已知问题之一,尽管它只发生在大量输入文件中,并且最终用户正在采取自己的解决方法。例如,停止查询 -> 删除旧的输入文件 -> 手动操作元数据以清除它们 -> 重新启动查询。

鉴于手动操作元数据并非微不足道且不理想(假设它应该停止流式查询,并强制最终用户了解元数据的格式),SPARK-27188 被提议作为替代方案 - 它应用保留并清除过时的输入文件来自元数据。

【讨论】:

感谢您指出适当的问题。由于这些票证仍处于开放状态,因此我得出结论,目前还没有合适的解决方案,只有一些解决方法。我现在选择了 (***.com/questions/53479585/…)。这暂时回答了我的问题,我希望这些票能很快得到解决:)【参考方案3】:

据我了解,这有三个选项可以解决这个问题:

1) 使用spark.load(filePathsUsingGlobRegex) 只加载需要读取的文件,这样spark 不需要加载所有文件,因此不需要spark_metadata。

优点:您仍然可以从 spark_metadata 中受益(读取速度更快,仍然可以确保精确一次语义)

缺点:您必须自己构建文件的路径,如果您将数据存储在各种分区策略中,这可能会更加混乱。

2)不要在输出目录disabling _spark_metadata in Structured streaming in spark 2.3.0创建spark_metadata

优点:清理很简单

缺点:您失去了 spark_metadata 的好处。

3) 了解并更新 spark_metadata 文件,同时升级删除旧文件。

优点:你同时拥有保留工作和 spark_metadata 的好处。

缺点:您必须手动更改 _spark_metadata,这可能是一个难以维护/混乱的代码。鉴于这是内部的火花并且可以改变。

【讨论】:

以上是关于如何对 Spark Streaming 生成的分区 parquet 文件进行适当的内务管理的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming - groupByKey 按分区单独

Spark Streaming Dataframe 执行,有状态,分区本地 groupBy,避免洗牌

在 Spark Structured Streaming 中从中间读取现有多级分区文件数据的问题

解释Spark Structured Streaming执行程序和Kafka分区之间的映射

Spark版本定制八:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

低调对比Spark Streaming与Flink