结构化流增量文件不存在

Posted

技术标签:

【中文标题】结构化流增量文件不存在【英文标题】:Structured Streaming delta file does not exist 【发布时间】:2019-01-28 02:48:59 【问题描述】:

我正在运行 spark2.2.1 结构化流,由于文件不存在,程序在一段时间后失败,我在enter link description here 找到了这个 ,但它对我不起作用。然后我认为问题可能是检查点,我将代码更改为以下 `

    Dataset<Row> df = this.spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", bootstrapServers)
                .option("subscribe", topic)
                .option("startingOffsets", startingOffsets)
                .option("failOnDataLoss", "false")
                .load();
                ……
 StreamingQuery start = result.writeStream()
                .foreach(new CrossVhcLaneForeach(kafkaProperties, laneTopic))
                .outputMode("update")
                .option("checkpointLocation", this.checkPointLocation+"/laneDir")
                .trigger(Trigger.ProcessingTime(Long.parseLong(delayTime),TimeUnit.SECONDS))
                .start();

` 但是随后程序将处于一种假死状态,它不会停止运行,也不会报错,希望有人有办法帮助我。谢谢。 我用过java1.8,spark2.2.1standalone,hadoo2.7.3,遇到的错误如下:


19/01/24 10:50:22 INFO TaskSetManager: Starting task 5.1 in stage 13.0 (TID 979, 34.55.0.164, executor 1, partition 5, AN
Y, 4730 bytes)19/01/24 10:50:22 WARN TaskSetManager: Lost task 4.0 in stage 13.0 (TID 976, 34.55.0.164, executor 1): java.lang.IllegalS
tateException: Error reading delta file /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4/1.delta of HDFSStateStoreProvider[id = (op=0, part=4), dir = /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4]: /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4/1.delta does not exist        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$str
eaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$exec
ution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$exec
ution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$exec
ution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$exec
ution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$str
eaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvi
der.scala:265)        at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:200)
        at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4/1.delta does
 not exist        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
        at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
        at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
        at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
        at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$str
eaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:407)        ... 21 more

19/01/24 10:50:22 INFO TaskSetManager: Starting task 4.1 in stage 13.0 (TID 980, 34.55.0.164, executor 1, partition 4, AN
Y, 4730 bytes)19/01/24 10:50:22 INFO TaskSetManager: Lost task 3.1 in stage 13.0 (TID 978) on 34.55.0.164, executor 1: java.lang.Illega
lStateException (Error reading delta file /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/3/1.delta of HDFSStateStoreProvider[id = (op=0, part=3), dir = /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/3]: /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/3/1.delta does not exist) [duplicate 1]19/01/24 10:50:22 INFO TaskSetManager: Starting task 3.2 in stage 13.0 (TID 981, 34.55.0.164, executor 1, partition 3, AN
Y, 4730 bytes)19/01/24 10:50:22 INFO TaskSetManager: Lost task 5.1 in stage 13.0 (TID 979) on 34.55.0.164, executor 1: java.lang.Illega
lStateException (Error reading delta file /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/5/1.delta of HDFSStateStoreProvider[id = (op=0, part=5), dir = /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/5]: /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/5/1.delta does not exist) [duplicate 1]19/01/24 10:50:22 INFO TaskSetManager: Starting task 5.2 in stage 13.0 (TID 982, 34.55.0.164, executor 1, partition 5, AN
Y, 4730 bytes)19/01/24 10:50:22 INFO TaskSetManager: Lost task 3.2 in stage 13.0 (TID 981) on 34.55.0.164, executor 1: java.lang.Illega
lStateException (Error reading delta file /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/3/1.delta of HDFSStateStoreProvider[id = (op=0, part=3), dir = /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/3]: /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/3/1.delta does not exist) [duplicate 2]19/01/24 10:50:22 INFO TaskSetManager: Starting task 3.3 in stage 13.0 (TID 983, 34.55.0.164, executor 1, partition 3, AN
Y, 4730 bytes)19/01/24 10:50:22 INFO TaskSetManager: Lost task 4.1 in stage 13.0 (TID 980) on 34.55.0.164, executor 1: java.lang.Illega
lStateException (Error reading delta file /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4/1.delta of HDFSStateStoreProvider[id = (op=0, part=4), dir = /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4]: /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4/1.delta does not exist) [duplicate 1]19/01/24 10:50:22 INFO TaskSetManager: Starting task 4.2 in stage 13.0 (TID 984, 34.55.0.164, executor 1, partition 4, AN
Y, 4730 bytes)19/01/24 10:50:22 INFO TaskSetManager: Lost task 5.2 in stage 13.0 (TID 982) on 34.55.0.164, executor 1: java.lang.Illega
lStateException (Error reading delta file /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/5/1.delta of HDFSStateStoreProvider[id = (op=0, part=5), dir = /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/5]: /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/5/1.delta does not exist) [duplicate 2]19/01/24 10:50:22 INFO TaskSetManager: Starting task 5.3 in stage 13.0 (TID 985, 34.55.0.164, executor 1, partition 5, AN
Y, 4730 bytes)19/01/24 10:50:22 INFO TaskSetManager: Lost task 4.2 in stage 13.0 (TID 984) on 34.55.0.164, executor 1: java.lang.Illega
lStateException (Error reading delta file /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4/1.delta of HDFSStateStoreProvider[id = (op=0, part=4), dir = /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4]: /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/4/1.delta does not exist) [duplicate 2]19/01/24 10:50:22 INFO TaskSetManager: Starting task 4.3 in stage 13.0 (TID 986, 34.55.0.164, executor 1, partition 4, AN
Y, 4730 bytes)19/01/24 10:50:22 INFO TaskSetManager: Lost task 3.3 in stage 13.0 (TID 983) on 34.55.0.164, executor 1: java.lang.Illega
lStateException (Error reading delta file /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/3/1.delta of HDFSStateStoreProvider[id = (op=0, part=3), dir = /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/3]: /tmp/temporary-507089d7-9a64-40aa-9e8e-ab8a276f5bcf/state/0/3/1.delta does not exist) [duplicate 3]19/01/24 10:50:22 ERROR TaskSetManager: Task 3 in stage 13.0 failed 4 times; aborting job
19/01/24 10:50:22 INFO TaskSchedulerImpl: Cancelling stage 13
19/01/24 10:50:22 INFO TaskSchedulerImpl: Stage 13 was cancelled

【问题讨论】:

有点晚了,但无论如何,我在 pyspark 2.4.2 上遇到了完全相同的问题。最终我也改变了我的检查点目录和主题名称。奇怪,但它有效 这里有同样的问题。 【参考方案1】:

Spark 在检查点位置保存/存储流状态(如果您愿意,可以使用 hdfs)。如果特定状态突然停止/失败(例如在动态节点分配应用程序中丢失执行程序).. spark 无法从中选择...解决方案是清除偏移量并重新开始(主要取决于用例)

【讨论】:

以上是关于结构化流增量文件不存在的主要内容,如果未能解决你的问题,请参考以下文章

增量文件版本的 Pyspark 结构化流错误

odi增量更新策略

IO流创建文件或文件夹

无法打开 geoip.dat 文件。即使文件存在,“无法打开流”

Databricks - 从增量表写入流到 orc 文件的读取流仅具有更改

java 中 “文件” 和 “流” 的简单分析