从另一个 rdd 中搜索 rdd 的值

Posted

技术标签:

【中文标题】从另一个 rdd 中搜索 rdd 的值【英文标题】:search rdd for value from another rdd 【发布时间】:2015-08-07 05:01:06 【问题描述】:

我正在使用 Spark + Scala。我的 rdd1 有客户信息,即 (id, [name, address])。 rdd2 只有知名客户的名字。现在我想查找 rdd1 中的客户是否高调。如何使用另一个搜索一个 rdd?加入 rdd 对我来说似乎不是一个好的解决方案。

我的代码:

val result = rdd1.map( case (id, customer) => 
  customer.foreach ( c => 
    rdd2.filter(_ == c._1).count()!=0 ))

错误org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations;

【问题讨论】:

"加入 rdd 对我来说并不是一个好的解决方案。"为什么不呢? 因为 rdd 没有公共密钥,内部连接不会让已经很大的 rdd 变得超级庞大? 谢谢保罗。我不得不刷新我的加入知识。对内连接和外连接感到困惑。 【参考方案1】:

你必须通过收集来广播一个rdd。您可以广播较小的 rdd 以提高性能。

val bcastRdd = sc.broadcast(rdd2.collect)
rdd1.map(
   case (id, customer) => customer.foreach(c => 
        bcastRdd.value.filter(_ == c._1).count()!=0))

【讨论】:

广播在这个例子中没有任何区别。闭包捕获的变量无论如何都会被广播。当您想在多个阶段使用某些东西时,您只需要显式广播。【参考方案2】:

您可以使用左外连接,以避免进行昂贵的操作,例如收集(如果您的 RDD 很大)

正如丹尼尔所指出的,广播不是必需的。

这是一个 sn-p,它可以帮助获得带有标志的 RDD1,该标志表示他是高调客户或低调客户。

val highProfileFlag = 1
val lowProfileFlag = 0 

// Keying rdd 1 by the name    
val rdd1Keyed = rdd1.map  case (id, (name, address)) => (name, (id, address)) 

// Keying rdd 2 by the name and adding a high profile flag
val rdd2Keyed = rdd2.map  case name => (name, highProfileFlag) 

// The join you are looking for is the left outer join
val rdd1HighProfileFlag = rdd1Keyed
.leftOuterJoin(rdd2Keyed)
.map  case (name, (id, address), highProfileString) => 
      val profileFlag = highProfileString.getOrElse(lowProfileFlag) 
      (id , (name, address, profileFlag))

【讨论】:

以上是关于从另一个 rdd 中搜索 rdd 的值的主要内容,如果未能解决你的问题,请参考以下文章

从另一个访问特定的 RDD 分区

Spark RDD编程

Apache Spark - 多个 RDD 的交集

(为啥)我们需要在 RDD 上调用缓存还是持久化

如何在 Python 中解压缩 RDD 中每个项目的值(列表)?

将 RDD 的值作为变量传递给另一个 RDD - Spark #Pyspark [重复]