迭代cogrouped RDD

Posted

技术标签:

【中文标题】迭代cogrouped RDD【英文标题】:Iterating over cogrouped RDD 【发布时间】:2015-10-29 18:47:59 【问题描述】:

我使用了一个cogroup函数并获得了以下RDD:

org.apache.spark.rdd.RDD[(Int, (Iterable[(Int, Long)], Iterable[(Int, Long)]))]

在地图操作之前加入的对象应该是这样的:

RDD[(Int, (Iterable[(Int, Long)], Iterable[(Int, Long)]))]

(-2095842000,(CompactBuffer((1504999740,1430096464017), (613904354,1430211912709), (-1514234644,1430288363100), (-276850688,1430330412225)),CompactBuffer((-511732877,1428682217564), (1133633791,1428831320960), (1168566678,1428964645450), (-407341933,1429009306167), (-1996133514,1429016485487), (872888282,1429031501681), (-826902224,1429034491003), (818711584,1429111125268), (-1068875079,1429117498135), (301875333,1429121399450), (-1730846275,1429131773065), (1806256621,1429135583312))))
(352234000,(CompactBuffer((1350763226,1430006650167), (-330160951,1430320010314)),CompactBuffer((2113207721,1428994842593), (-483470471,1429324209560), (1803928603,1429426861915))))

现在我想做以下事情:

val globalBuffer = ListBuffer[Double]()
val joined = data1.cogroup(data2).map(x => 
  val listA = x._2._1.toList
  val listB = x._2._2.toList
  for(tupleB <- listB) 
    val localResults = ListBuffer[Double]()
    val itemToTest = Set(tupleB._1)
    val tempList = ListBuffer[(Int, Double)]()
    for(tupleA <- listA) 
      val tValue = someFunctionReturnDouble(tupleB._2, tupleA._2)
      val i = (tupleA._1, tValue)
      tempList += i
    
    val sortList = tempList.sortWith(_._2 > _._2).slice(0,20).map(i => i._1)
    val intersect = sortList.toSet.intersect(itemToTest)
    if (intersect.size > 0)
      localResults += 1.0
    else localResults += 0.0
    val normalized = sum(localResults.toList)/localResults.size
    globalBuffer += normalized
  
)

//method sum
def sum(xs: List[Double]): Double = //do the sum

最后,我期望 join 是一个具有双值的列表。但是当我看到它时,它是单位。我也认为这不是 Scala 的做法。如何获得globalBuffer作为最终结果。

【问题讨论】:

【参考方案1】:

嗯,如果我正确理解了您的代码,它将受益于这些改进:

val joined = data1.cogroup(data2).map(x => 
  val listA = x._2._1.toList
  val listB = x._2._2.toList
  val localResults = listB.map  
    case (intBValue, longBValue) =>
    val itemToTest = intBValue // it's always one element
    val tempList = listA.map 
       case (intAValue, longAValue) =>
       (intAValue, someFunctionReturnDouble(longBvalue, longAValue))
    
    val sortList = tempList.sortWith(-_._2).slice(0,20).map(i => i._1)
    if (sortList.toSet.contains(itemToTest))  1.0  else 0.0
// no real need to convert to a set for 20 elements, by the way
  
  sum(localResults)/localResults.size
)

【讨论】:

@Null-Hypothesis,您能否澄清您在之前评论中的意思。你的意思是当你在listB 上“循环”时你想要某种运行比例?请进一步澄清。 @Null-Hypothesis 是的,它在 Scala 中有效:函数体中最后一条指令的结果成为该函数返回的值。它将是 RDD[Double],如果需要,您必须将其转换为列表。 @Null-Hypothesis,所以对于每个 x 你想要一个单一的比例?如果是这样,那么我认为无论是阿莎琳德的方法还是我的方法(它们与一些小细节的模数几乎相同)都可以正常工作。假设这是您最终想要的,您只需执行joined.collect 即可获取本地集合。 @Null-Hypothesis,忘记globalBuffer。请参阅我的回答,了解为什么这不起作用。由 Ashalynd 或我的代码创建的 RDD joined 将包含您想要的所有比例。如果你想在本地 Array 中使用它,那么只需使用 val localJoined = joined.collect。如果您在List 中需要它,请使用val localJoined = joined.collect.toList 或者您可以根据需要将.collect.toList 权限添加到joined 的定义中。【参考方案2】:

RDDs 的转换不会修改 globalBuffer复制 globalBuffer 并发送给每个工作人员,但是对工作人员上这些副本的任何修改都不会修改驱动程序上存在的 globalBuffer(您定义的那个)在RDD 上的map 之外。)这是我所做的(还有一些额外的修改):

val joined = data1.cogroup(data2) map  x =>
  val iterA = x._2._1
  val iterB = x._2._2
  var count, positiveCount = 0
  val tempList = ListBuffer[(Int, Double)]()
  for (tupleB <- iterB) 
    tempList.clear
    for(tupleA <- iterA) 
      val tValue = someFunctionReturnDouble(tupleB._2, tupleA._2)
      tempList += ((tupleA._1, tValue))
    
    val sortList = tempList.sortWith(_._2 > _._2).iterator.take(20)
    if (sortList.exists(_._1 == tupleB._1)) positiveCount += 1
    count += 1
  
  positiveCount.toDouble/count

此时您可以使用joined.collect获取比例的本地副本。

【讨论】:

@Null-Hypothesis,请参阅我在代码前的评论——RDD 上的转换 不能 修改驱动程序上任何对象的状态(除非您是使用Accumulators/Accumalables,你不是......我不建议在这种情况下。) @Null-Hypothesis,如果你不知道(如果你知道,请原谅我),在 Scala 中,最后评估的值是函数返回的值。所以上面的函数(传入RDDmap方法)会将每个x映射到positiveCount.toDouble/count。应用于此函数的map 方法的结果将是一个包含这些比例的RDD

以上是关于迭代cogrouped RDD的主要内容,如果未能解决你的问题,请参考以下文章

迭代 RDD 迭代器并应用限制时,Spark 似乎没有调用 hasNext

在 rdd 中迭代 CompactBuffer

RDD内存迭代原理

Pyspark:使用 map 函数而不是 collect 来迭代 RDD

PySpark:在“NoneType”对象上过滤掉 RDD 元素失败是不可迭代的

apache spark - 迭代地跳过并从 RDD 中获取