Apache Spark Reduce 与 java.lang.Math.max 意外行为

Posted

技术标签:

【中文标题】Apache Spark Reduce 与 java.lang.Math.max 意外行为【英文标题】:Apache Spark Reduce with java.lang.Math.max unexpected behaviour 【发布时间】:2016-03-19 21:14:35 【问题描述】:

在将 Spark reduce 函数与 java.lang.Math.max 结合使用时,我遇到了一些意外行为。这是示例代码:

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);

上面的代码被多次调用,大多数时候会产生这样的意外结果:

[-2754285, -2535458, -2626449, -3182283] //printed RDD after collect
After Reduce: -2392513 //value produced by reducer

正如你所看到的,reducer 产生值-2392513,但是当与 RDD 的打印值比较时,这个值甚至不在 RDD 中。为什么? collect() 是否影响 reduce()?我也尝试过其他方式,首先减少然后收集原始 RDD,但我仍然观察到这种奇怪的行为。我在想从java.Math 库传递静态方法可能会导致序列化时出现问题,但参考这个Spark Quick Start Tutorial 他们也在reducer 中使用Math.max,显然它应该可以工作。

有什么想法吗?

谢谢

编辑:

附加信息:此 sn-p 是具有多次迭代的较大程序的一部分,并且在每次迭代中都会被调用。第一次迭代产生正确的结果,其中从 reducer 产生的 maxValue 是正确的值,但所有其他迭代都产生奇怪的结果

EDIT2:

当我像这样连续打印 3 次 populationWithFitness.values().collect().toString() 时:

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);

我得到如下所示的输出:

Generation 1
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
After Reduce: -3054649
Generation 2
[-3084310, -3931687, -3508984, -3054649]
[-3084310, -3847178, -3508984, -2701881]
[-3148206, -3984035, -2806859, -2989184]
After Reduce: -2949478
Generation 3
[-3187591, -3984035, -3696853, -3054649]
[-3187591, -3984035, -3178920, -3015411]
[-3148206, -3804759, -3657984, -2701881]
After Reduce: -2710313
Generation 4
[-3187591, -2982220, -3310753, -3054649]
[-3148206, -2985628, -3657984, -2701881]
[-3148206, -2706580, -3451228, -2989184]
After Reduce: -2692651
.
.
.

正如您在第一次迭代中看到的那样,一切正常,但在所有下一次迭代中,它都会产生奇怪的输出。我想问题是它与惰性评估有关,当我调用 collect 时,它没有应用转换,但我不确定。

我也尝试将reduce(Math::max) 替换为JavaDoubleRDD 并在此JavaDoubleRDD 上调用max,但结果相同:

JavaDoubleRDD stats = populationWithFitness.mapToDouble(tup -> tup._2());
long currentMaxFitness = stats.max().longValue();

另一个重要的一点是,我在 local 模式下测试这段代码并使用参数运行它:

spark --class "main.TravellingSalesmanMain" --master local[4] exampletravellingsalesman-1.0-SNAPSHOT.jar > sparkoutput.txt

【问题讨论】:

这真的是实际代码吗? 是的,除了我应用map()sampleRdd 是一些在映射后产生Long 值的对象的集合。其他部分是我的代码中的 sn-ps,带有替换的变量名称,以便于参考。提供的示例输出也是我得到的实际输出。 你的rdd的来源是什么? 我已经用完整的源代码片段更新了我的问题。我应该提到populationWithFitness.values().collect().toString() 仅用于调试目的,我不打算在代码的最终版本中使用它,我只需要检查reduce(Math::max) 是否产生预期结果 你能提供一个最小的、可重现的例子吗? evaluateFitness 是做什么的?该代码有很多隐藏的因素。你在变异什么?如果连续多次打印collect 会发生什么? 【参考方案1】:

这很可能 (99%) 发生在 evaluateFitness(isl, fitnessCalculator) 内部的某个地方。似乎它正在使用某种不可重现的来源,因此正在发回不同的运行结果。请记住,Spark 是惰性的,执行将在每个后续操作上重新运行。您可以使用缓存来帮助解决此问题,但即使这样也可能会失败(节点失败/数据超出缓存)。最好的办法是在这里使用检查点,但更重要的是,您应该更改执行本身,使其具有幂等性。

【讨论】:

非常感谢。 Cache 为我工作。你对非确定性map 函数是正确的,但它不是evaluateFitness,而是随后的selectioncrossover 函数(我猜你从变量命名得出结论,我说的是遗传算法),它需要一些随机因素为了工作,所以我不能改变实现位。我链接map 阶段并调用reduce 仅用于最大适应度和最终人口以获得最佳解决方案。我关注了一些研究论文,他们在 Hadoop 上实现了它,但 Spark 惰性 eval 使它有点棘手。

以上是关于Apache Spark Reduce 与 java.lang.Math.max 意外行为的主要内容,如果未能解决你的问题,请参考以下文章

Spark:Reduce()与Fold()之间的区别[重复]

Spark Streaming的样本demo统计

SparkStreaming wordcount demo

SparkStreaming wordcount demo

Spark streaming storm map reduce区别与联系

Spark- Action实战