删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?
Posted
技术标签:
【中文标题】删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?【英文标题】:Will I lose data while removing the corrupted parquet file writen by spark-structured-streaming? 【发布时间】:2019-05-25 03:12:23 【问题描述】:我使用 spark-structured-streaming 作为消费者从 kafka 获取数据,按照指南参考 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
然后将数据作为 parquet 文件保存到 hdfs。
这是我的问题: 该程序运行良好,但一些容器很少失败(但它确实发生了)导致一些损坏的镶木地板文件。它将导致错误,例如 [不是 Parquet 文件(长度太小:4)] 或 [.parquet 不是 Parquet 文件。尾部的预期幻数 [80, 65, 82, 49] 但发现 [56, 52, 53, 51]] 阅读它们时。 我必须将它们移动到其他目录并确保来自 hive 的查询运行良好。但我不确定是否会因为搬家而导致数据丢失。
我知道 spark-structured-streaming 使用检查点来恢复,但由于一些数据已写入 parquet,我不确定偏移量是否标记为已提交。
【问题讨论】:
【参考方案1】:我做了一个非常基本的练习,将一个 txt 文件加载到 Spark 结构化流读取的文件目录中。结构化流的 writestream 正在写入 parquet 文件。加载两个文件后,我看到 spark 生成的元数据提到了这两个文件。因此,如果您删除其中一个(包括使用文件接收器创建的元数据文件),则从 HDFS 读取 parquet 文件会失败,并出现异常(找不到文件)。
scala> val ParquetDF1 = spark.read.parquet("/user/root/sink2")
19/05/29 09:57:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 19, quickstart.cloudera, executor 2): org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:290)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:537)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:610)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:602)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/sink2/part-00000-454836ef-f7bc-444e-9a6b-e81e640a196d-c000.snappy.parquet
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2092)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2062)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1975)
这里唯一的区别是-您使用的是 Hive,而我直接从 HDFS 构建 Parquet 数据帧。
【讨论】:
可能我的问题没有表达清楚。首先,我不会移动元数据目录,因为我只移动以“part-”开头并以“.parquet”结尾的文件 其次,我真正担心的是偏移量。是否会在程序重新启动时准确地继续记录在最后的位置以上是关于删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?的主要内容,如果未能解决你的问题,请参考以下文章