Apache Spark Reduce 与 java.lang.Math.max 意外行为
Posted
技术标签:
【中文标题】Apache Spark Reduce 与 java.lang.Math.max 意外行为【英文标题】:Apache Spark Reduce with java.lang.Math.max unexpected behaviour 【发布时间】:2016-03-19 21:14:35 【问题描述】:在将 Spark reduce
函数与 java.lang.Math.max
结合使用时,我遇到了一些意外行为。这是示例代码:
JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);
上面的代码被多次调用,大多数时候会产生这样的意外结果:
[-2754285, -2535458, -2626449, -3182283] //printed RDD after collect
After Reduce: -2392513 //value produced by reducer
正如你所看到的,reducer 产生值-2392513
,但是当与 RDD 的打印值比较时,这个值甚至不在 RDD 中。为什么? collect()
是否影响 reduce()
?我也尝试过其他方式,首先减少然后收集原始 RDD,但我仍然观察到这种奇怪的行为。我在想从java.Math
库传递静态方法可能会导致序列化时出现问题,但参考这个Spark Quick Start Tutorial 他们也在reducer
中使用Math.max
,显然它应该可以工作。
有什么想法吗?
谢谢
编辑:
附加信息:此 sn-p 是具有多次迭代的较大程序的一部分,并且在每次迭代中都会被调用。第一次迭代产生正确的结果,其中从 reducer
产生的 maxValue
是正确的值,但所有其他迭代都产生奇怪的结果
EDIT2:
当我像这样连续打印 3 次 populationWithFitness.values().collect().toString()
时:
JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator));
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
System.out.println(populationWithFitness.values().collect().toString());
long currentMaxFitness = populationWithFitness.values().reduce(Math::max);
System.out.println("After Reduce: " + currentMaxFitness);
我得到如下所示的输出:
Generation 1
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
[-3187591, -3984035, -3508984, -3054649]
After Reduce: -3054649
Generation 2
[-3084310, -3931687, -3508984, -3054649]
[-3084310, -3847178, -3508984, -2701881]
[-3148206, -3984035, -2806859, -2989184]
After Reduce: -2949478
Generation 3
[-3187591, -3984035, -3696853, -3054649]
[-3187591, -3984035, -3178920, -3015411]
[-3148206, -3804759, -3657984, -2701881]
After Reduce: -2710313
Generation 4
[-3187591, -2982220, -3310753, -3054649]
[-3148206, -2985628, -3657984, -2701881]
[-3148206, -2706580, -3451228, -2989184]
After Reduce: -2692651
.
.
.
正如您在第一次迭代中看到的那样,一切正常,但在所有下一次迭代中,它都会产生奇怪的输出。我想问题是它与惰性评估有关,当我调用 collect 时,它没有应用转换,但我不确定。
我也尝试将reduce(Math::max)
替换为JavaDoubleRDD
并在此JavaDoubleRDD
上调用max
,但结果相同:
JavaDoubleRDD stats = populationWithFitness.mapToDouble(tup -> tup._2());
long currentMaxFitness = stats.max().longValue();
另一个重要的一点是,我在 local 模式下测试这段代码并使用参数运行它:
spark --class "main.TravellingSalesmanMain" --master local[4] exampletravellingsalesman-1.0-SNAPSHOT.jar > sparkoutput.txt
【问题讨论】:
这真的是实际代码吗? 是的,除了我应用map()
的sampleRdd
是一些在映射后产生Long
值的对象的集合。其他部分是我的代码中的 sn-ps,带有替换的变量名称,以便于参考。提供的示例输出也是我得到的实际输出。
你的rdd的来源是什么?
我已经用完整的源代码片段更新了我的问题。我应该提到populationWithFitness.values().collect().toString()
仅用于调试目的,我不打算在代码的最终版本中使用它,我只需要检查reduce(Math::max)
是否产生预期结果
你能提供一个最小的、可重现的例子吗? evaluateFitness
是做什么的?该代码有很多隐藏的因素。你在变异什么?如果连续多次打印collect
会发生什么?
【参考方案1】:
这很可能 (99%) 发生在 evaluateFitness(isl, fitnessCalculator)
内部的某个地方。似乎它正在使用某种不可重现的来源,因此正在发回不同的运行结果。请记住,Spark 是惰性的,执行将在每个后续操作上重新运行。您可以使用缓存来帮助解决此问题,但即使这样也可能会失败(节点失败/数据超出缓存)。最好的办法是在这里使用检查点,但更重要的是,您应该更改执行本身,使其具有幂等性。
【讨论】:
非常感谢。Cache
为我工作。你对非确定性map
函数是正确的,但它不是evaluateFitness
,而是随后的selection
和crossover
函数(我猜你从变量命名得出结论,我说的是遗传算法),它需要一些随机因素为了工作,所以我不能改变实现位。我链接map
阶段并调用reduce
仅用于最大适应度和最终人口以获得最佳解决方案。我关注了一些研究论文,他们在 Hadoop 上实现了它,但 Spark 惰性 eval 使它有点棘手。以上是关于Apache Spark Reduce 与 java.lang.Math.max 意外行为的主要内容,如果未能解决你的问题,请参考以下文章
Spark:Reduce()与Fold()之间的区别[重复]