删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

Posted

技术标签:

【中文标题】删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?【英文标题】:Will I lose data while removing the corrupted parquet file writen by spark-structured-streaming? 【发布时间】:2019-05-25 03:12:23 【问题描述】:

我使用 spark-structured-streaming 作为消费者从 kafka 获取数据,按照指南参考 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

然后将数据作为 parquet 文件保存到 hdfs。

这是我的问题: 该程序运行良好,但一些容器很少失败(但它确实发生了)导致一些损坏的镶木地板文件。它将导致错误,例如 [不是 Parquet 文件(长度太小:4)] 或 [.parquet 不是 Parquet 文件。尾部的预期幻数 [80, 65, 82, 49] 但发现 [56, 52, 53, 51]] 阅读它们时。 我必须将它们移动到其他目录并确保来自 hive 的查询运行良好。但我不确定是否会因为搬家而导致数据丢失。

我知道 spark-structured-streaming 使用检查点来恢复,但由于一些数据已写入 parquet,我不确定偏移量是否标记为已提交。

【问题讨论】:

【参考方案1】:

我做了一个非常基本的练习,将一个 txt 文件加载到 Spark 结构化流读取的文件目录中。结构化流的 writestream 正在写入 parquet 文件。加载两个文件后,我看到 spark 生成的元数据提到了这两个文件。因此,如果您删除其中一个(包括使用文件接收器创建的元数据文件),则从 HDFS 读取 parquet 文件会失败,并出现异常(找不到文件)。

scala> val ParquetDF1 = spark.read.parquet("/user/root/sink2")
19/05/29 09:57:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 19, quickstart.cloudera, executor 2): org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
        at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:537)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:610)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:602)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/sink2/part-00000-454836ef-f7bc-444e-9a6b-e81e640a196d-c000.snappy.parquet
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2092)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2062)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1975)

这里唯一的区别是-您使用的是 Hive,而我直接从 HDFS 构建 Parquet 数据帧。

【讨论】:

可能我的问题没有表达清楚。首先,我不会移动元数据目录,因为我只移动以“part-”开头并以“.parquet”结尾的文件 其次,我真正担心的是偏移量。是否会在程序重新启动时准确地继续记录在最后的位置

以上是关于删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?的主要内容,如果未能解决你的问题,请参考以下文章

删除由 git 创建的大型 .pack 文件

删除父对象时如何删除由stream.flatMap创建的列表中的子对象?

删除由表中记录数给出的记录数

如何仅删除由特定 matlab 脚本创建的变量

删除由动态变化的字符串前置的脚本类型属性

从由外部脚本生成的元素中删除 DOM 元素