SPARK outputDeterministicLevel的作用--任务全部重试或者部分重试
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK outputDeterministicLevel的作用--任务全部重试或者部分重试相关的知识,希望对你有一定的参考价值。
背景
目前spark的repartition()
方法是随机分配数据到下游,这会导致一个问题,有时候如果我们用repartition
方法的时候,如果任务发生了重试,就有可能导致任务的数据不准确,那这个时候改怎么解决这个问题呢?
分析
在Spark RDD中存在着名为outputDeterministicLevel的变量,如下:
private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value =
if (isReliablyCheckpointed)
DeterministicLevel.DETERMINATE
else
getOutputDeterministicLevel
那么该变量的作用是什么呢?让我们分析一下:
改变量最终会被Stage
的isIndeterminate
方法调用:
def isIndeterminate: Boolean =
rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
而该方法会被DAGScheduler
调用,有两处地方会被调用:
- submitMissingTasks中调用
private def submitMissingTasks(stage: Stage, jobId: Int): Unit =
logDebug("submitMissingTasks(" + stage + ")")
// Before find missing partition, do the intermediate state clean work first.
// The operation here can make sure for the partially completed intermediate stage,
// `findMissingPartitions()` returns all partitions every time.
stage match
case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
case _ =>
该方法主要用于在重新提交失败的stage时候,用来判断是否需要重新计算上游的所有任务。
- handleTaskCompletion中调用
case FetchFailed(bmAddress, shuffleId, _, mapIndex, _, failureMessage) =>
。。。
val noResubmitEnqueued = !failedStages.contains(failedStage)
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued)
// If the map stage is INDETERMINATE, which means the map tasks may return
// different result when re-try, we need to re-try all the tasks of the failed
// stage and its succeeding stages, because the input data will be changed after the
// map tasks are re-tried.
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
if (mapStage.isIndeterminate)
这里如果任务Fetch失败了,根据该shuffle所对应的上游stage是不是isIndeterminate
来向DAGScheduler
提交ResubmitFailedStages
事件,从而调用submitMissingTasks
方法进行上游所有任务或者单个任务的重试。
再回到outputDeterministicLevel
变量,该变量会调用getOutputDeterministicLevel
方法进行循环调用上游的outputDeterministicLevel
变量来确定outputDeterministicLevel
的值。
结论
所以根据以上分析,我们可以改写对应的RDD的outputDeterministicLevel
变量或者getOutputDeterministicLevel
方法来进行stage任务的全部重试与否
以上是关于SPARK outputDeterministicLevel的作用--任务全部重试或者部分重试的主要内容,如果未能解决你的问题,请参考以下文章