从“未配置分区重置策略的偏移量超出范围”中恢复

Posted

技术标签:

【中文标题】从“未配置分区重置策略的偏移量超出范围”中恢复【英文标题】:Recovering from "Offsets out of range with no configured reset policy for partitions" 【发布时间】:2020-07-29 01:12:37 【问题描述】:

我有来自 Kafka 的 Spark Structured Streaming 应用程序 (Spark 2.4.5)。应用程序关闭了一段时间,但是当我重新启动它时,出现以下错误。

我完全理解为什么会出现错误,我对此表示满意。但我似乎无法绕过它。根据日志,我看到“从最早的偏移量恢复:1234332978”,但这似乎正在发生。我尝试在我的检查点位置删除“源”文件夹,但也没有帮助。

我的代码使用了 mapGroupWithState 函数,所以我确实有不想丢失的状态数据,因此删除整个 Checkpoint 目录不是我的首选方法。 我已经设置:

.option("failOnDataLoss", false) .option("startingOffsets", "latest")

但这似乎只适用于新分区。

有没有办法告诉 Spark 接受缺少偏移量并继续?还是在不影响应用程序“状态”的情况下手动删除偏移数据的某种方法?

20/07/29 01:02:40 WARN InternalKafkaConsumer: Cannot fetch offset 1215191190 (GroupId: spark-kafka-source-f9562fca-ab0c-4f7a-93c3-20506cbcdeb7--1440771761-executor, TopicPartition: cmusstats-0). 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you want your streaming query to fail on such cases, set the source
 option "failOnDataLoss" to "true".
    
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: cmusstats-0=1215191190
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:470)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchRecord(KafkaDataConsumer.scala:361)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:251)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:64)
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:500)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:357)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/07/29 01:02:40 WARN InternalKafkaConsumer: Some data may be lost. Recovering from the earliest offset: 1234332978
20/07/29 01:02:40 WARN InternalKafkaConsumer: 
The current available offset range is AvailableOffsetRange(1234332978,1328165875).
 Offset 1215191190 is out of range, and records in [1215191190, 1215691190) will be
 skipped (GroupId: spark-kafka-source-f9562fca-ab0c-4f7a-93c3-20506cbcdeb7--1440771761-executor, TopicPartition: cmusstats-0). 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you want your streaming query to fail on such cases, set the source
 option "failOnDataLoss" to "true".

【问题讨论】:

这能回答你的问题吗? Spark set to read from earliest offset - throws error on attempting to consumer an offset no longer available on Kafka @mike 不幸的是,那里的解决方案对我不起作用。在结构化流中,Kafka group.id 和偏移量由 Spark 管理,因此无法在 Kafka 端进行重置。该解决方案与 Kafka dstreams 相关 【参考方案1】:

事实证明,结构化流应用程序最终恢复了。在一段时间内,记录了许多有关“无法获取偏移量”的错误。但一段时间后,处理继续以最早的偏移量。

我无法解释为什么在处理开始之前我得到了这么多这些错误,但它最终确实继续。

【讨论】:

以上是关于从“未配置分区重置策略的偏移量超出范围”中恢复的主要内容,如果未能解决你的问题,请参考以下文章

导出数据超出EXCEL行列本身限制时如何解决!!!

如何从XPages / dojo中的“超时超时”中恢复

ORA-19809: 超出了恢复文件数的限制

从任务中恢复失败超过 max_retries

ORA-19809: 超出了恢复文件数的限制故障处理实例

Oracle - ORA-19809: 超出了恢复文件数的限制