Spark 如何跟踪 randomSplit 中的拆分?

Posted

技术标签:

【中文标题】Spark 如何跟踪 randomSplit 中的拆分?【英文标题】:How does Spark keep track of the splits in randomSplit? 【发布时间】:2016-07-14 16:28:40 【问题描述】:

这个问题解释了 Spark 的随机拆分是如何工作的,How does Sparks RDD.randomSplit actually split the RDD,但我不明白 spark 如何跟踪哪些值进入了一个拆分,以便这些相同的值不会进入第二个拆分。

如果我们看一下randomSplit的实现:

def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame] = 
 // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
 // constituent partitions each time a split is materialized which could result in
 // overlapping splits. To prevent this, we explicitly sort each input partition to make the
 // ordering deterministic.

 val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global = false, logicalPlan)
 val sum = weights.sum
 val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
 normalizedCumWeights.sliding(2).map  x =>
  new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted))
.toArray

我们可以看到它创建了两个共享相同 sqlContext 和两个不同 Sample(rs) 的 DataFrame。

这两个 DataFrame(s) 如何相互通信,以使第一个中的值不包含在第二个中?

数据是否被提取了两次? (假设 sqlContext 是从数据库中选择的,选择是否被执行了两次?)。

【问题讨论】:

【参考方案1】:

这与对 RDD 进行采样完全相同。

假设您有权重数组(0.6, 0.2, 0.2),Spark 将为每个范围(0.0, 0.6), (0.6, 0.8), (0.8, 1.0) 生成一个DataFrame。

当需要读取结果 DataFrame 时,Spark 只会遍历父 DataFrame。对于每个项目,生成一个随机数,如果该数字在指定范围内,则发出该项目。所有子 DataFrame 共享相同的随机数生成器(技术上,不同的生成器具有相同的种子),因此随机数的序列是确定性的。

对于您的最后一个问题,如果您没有缓存父 DataFrame,那么每次计算输出 DataFrame 时都会重新获取输入 DataFrame 的数据。

【讨论】:

我要强调的是,一个完整的技巧是为每个 Sample 使用相同的种子。

以上是关于Spark 如何跟踪 randomSplit 中的拆分?的主要内容,如果未能解决你的问题,请参考以下文章

Spark:如何获得伯努利朴素贝叶斯的概率和 AUC?

Spark算子

Spark RDD操作记录(总结)

spark-shell - 如何避免抑制省略堆栈跟踪(异常)

org.apache.spark.sql.AnalysisException:无法从概率中提取值

randomSplit pyspark 更改数据帧的值