通过操作 JavaPairRDD 的值 (Sum) 来转换 JavaPairRDD
Posted
技术标签:
【中文标题】通过操作 JavaPairRDD 的值 (Sum) 来转换 JavaPairRDD【英文标题】:Transform JavaPairRDD by manipulating its values (Sum) Spark 【发布时间】:2017-07-22 01:14:58 【问题描述】:使用 Java 处理 apache spark。我有一个 JavaPairRDD RDD1,我想通过对 RDD1 的值求和来创建另一个 JavaPairRdd RDD2。但是,当我执行以下代码时,它会阻塞在 test_3 转换中而没有任何错误消息。我认为这与在另一个转换中执行 rdd 转换或操作的问题有关。
JavaPairRDD<Key, JavaPairRDD<Integer, Double>> test_2 = test_1.mapToPair(new PairFunction<Tuple2<Key, JavaPairRDD<Integer, Double>>, Key, JavaPairRDD<Integer, Double>>()
@Override
public Tuple2<Key, JavaPairRDD<Integer, Double>> call(Tuple2<Key, JavaPairRDD<Integer, Double>> t) throws Exception
return new Tuple2(t._1,t._2.reduceByKey((Double val1, Double val2)
-> Math.pow(Math.abs(val1 - val2), 2)));
);
JavaPairRDD<Key, JavaPairRDD<Integer, Double>> test_3 = test_2.mapToPair
(new PairFunction<Tuple2<Key, JavaPairRDD<Integer, Double>>, Key, JavaPairRDD<Integer, Double>>()
@Override
public Tuple2<Key, JavaPairRDD<Integer, Double>> call(Tuple2<Key, JavaPairRDD<Integer, Double>> t)
throws Exception
return new Tuple2(t._1,t._2.values().reduce((Double t1, Double t2) -> t1+t2));
);
JavaPairRDD<Key, Double> test_4= test_3.mapToPair
(new PairFunction<Tuple2<Key, JavaPairRDD<Integer, Double>>, Key, Double>()
@Override
public Tuple2<Key, Double> call(Tuple2<Key, JavaPairRDD<Integer, Double>> t) throws Exception
return new Tuple2(t._1,t._2.values().first());
);
【问题讨论】:
【参考方案1】:您的问题早于test_3
。您不能将值作为 RDD。 JavaPairRDD<Key, JavaPairRDD<Integer, Double>>
你或许可以尝试返回一个 tuple2(Integer, Double) 的列表。像这样的东西:JavaPairRDD<Key, List<Tuple2<Integer, Double>>>
【讨论】:
其实我们可以将值作为一个RDD。如果你能看到,当我对 test_2 RDD 执行收集操作时,我得到的结果没有问题。相反,这里的问题与调用值操作以对它们求和有关。换句话说,我需要创建一个新的 javaPairRDD,其值为先前 JavaPairRDD (test_2---->test_3) 的值的总和。 对我来说如何在 RDD 中拥有 RDD 不太有意义,我以前从未见过这种用例。当您在test_2
上进行收集时,如果您打印出结果,您会得到什么?每个键都有一个 tuple2 列表?
为了回应您,如果我在 test_2 上执行收集,我会得到具有相同键的 JavaPairRDD 和使用以下公式计算的值;现在我需要通过计算pairRDD的值的总和来使用test_2做同样的事情,但是它阻塞了,因为我在RDD转换中调用了一个动作。以上是关于通过操作 JavaPairRDD 的值 (Sum) 来转换 JavaPairRDD的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Java 中的 javaPairRDD 上使用 aggregateByKey?