RDD[Array[Int]] 上的 Scala Reduce() 操作

Posted

技术标签:

【中文标题】RDD[Array[Int]] 上的 Scala Reduce() 操作【英文标题】:Scala Reduce() operation on a RDD[Array[Int]] 【发布时间】:2018-12-08 15:28:09 【问题描述】:

我有一个一维矩阵的 RDD。我正在尝试做一个非常基本的reduce操作来总结来自各个分区的矩阵相同位置的值。

我正在使用:

var z=x.reduce((a,b)=>a+b)

var z=x.reduce(_ + _)

但我收到一条错误消息: 类型不匹配;找到 Array[Int],预期:String

我查了一下,找到了链接 Is there a better way for reduce operation on RDD[Array[Double]]

所以我尝试使用 import.spire.implicits._ 所以现在我没有任何编译错误,但是在运行代码后我得到了一个 java.lang.NoSuchMethodError。我在下面提供了整个错误。任何帮助将不胜感激。

java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
at spire.math.NumberTag$Integral$.<init>(NumberTag.scala:9)
at spire.math.NumberTag$Integral$.<clinit>(NumberTag.scala)
at spire.std.BigIntInstances.$init$(bigInt.scala:80)
at spire.implicits$.<init>(implicits.scala:6)
at spire.implicits$.<clinit>(implicits.scala)
at main.scala.com.ucr.edu.SparkScala.HistogramRDD$$anonfun$9.apply(HistogramRDD.scala:118)
at main.scala.com.ucr.edu.SparkScala.HistogramRDD$$anonfun$9.apply(HistogramRDD.scala:118)
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:190)
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:185)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1012)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1010)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

您的归约函数似乎试图添加两个数组,您真正想要的是明智地添加组件。 它应该可以正常工作。你是如何创建 RDD 的? 【参考方案1】:

据我了解,您正在尝试按数组中的位置减少项目。您应该考虑在减少 rdd 的同时压缩数组:

val a: RDD[Array[Int]] = ss.createDataset[Array[Int]](Seq(Array(1,2,3), Array(4,5,6))).rdd

    a.reducecase (a: Array[Int],b: Array[Int]) =>
        val ziped = a.zip(b)
        ziped.mapcase (i1, i2) => i1 + i2
    .foreach(println)

输出:

5
7
9

【讨论】:

我是 Scala 和 Spark 的新手。因此,一个后续问题。压缩函数是否会增加任何额外的处理开销?当我在一个包含 12 台机器、数据集约为 23 GB 的集群中运行此代码时,需要花费数小时来处理,这对我来说太长了。正是这个 reduce 函数需要最长时间。 zip 方法适用于可迭代对象并创建一个新的可迭代对象,因此它当然会增加处理和内存开销,但在大多数情况下并没有什么意义。阵列有多长?您的 12 台机器集群中有多少个核心,您正在处理多少条记录?所有记录的大小都相同吗?您可以尝试更改分区数。

以上是关于RDD[Array[Int]] 上的 Scala Reduce() 操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark-Core RDD行动算子

将 Scala 的 K-means 应用于 rdd 的每个元素。

如何通过 RDD Scala 与 join 进行映射

scala mapPartitionsWithIndex函数的使用

Spark-Core RDD转换算子-双Value型交互

使用 Spark Scala 对数组元素求和