使用两个 RDD apache spark

Posted

技术标签:

【中文标题】使用两个 RDD apache spark【英文标题】:working with two RDDs apache spark 【发布时间】:2014-06-12 21:38:38 【问题描述】:

我正在使用 calliope 即 spark 插件与 cassandra 连接。我创建了 2 个看起来像

的 RDD

class A val persistLevel = org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK val cas1 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 1") val sc1 = new SparkContext("local", "name it any thing ") var rdd1 = sc.cql3Cassandra[SCALACLASS_1](cas1) var rddResult1 = rdd1.persist(persistLevel)

class B val cas2 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 2") var rdd2 = sc1.cql3Cassandra[SCALACLASS_2](cas2) var rddResult2 = rdd2.persist(persistLevel)

不知何故,使用其他 2 个创建新 RDD 的代码库无法正常工作。有没有可能我们不能同时使用 2 个 RDD 进行迭代?

这是无效的代码 sn-p -

case class Report(id: Long, anotherId: Long)

  var reportRDD = rddResult2.flatMap(f => 
    val buf = List[Report]()
    **rddResult1.collect().toList**.foldLeft(buf)((k, v) => 
      val buf1 = new ListBuffer[Report]
      buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => 
        buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
      )
    )
  )

如果我替换粗体并为它初始化一个 val -

val collection = rddResult1.collect().toList

var reportRDD = rddResult2.flatMap(f => 
    val buf = List[Report]()
    **collection**.foldLeft(buf)((k, v) => 
      val buf1 = new ListBuffer[Report]
      buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => 
        buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
      )
    )
  )

有效,有什么解释吗?

【问题讨论】:

你遇到了什么错误? 这是一大堆代码,看起来与问题无关。介意清理一下吗?你会发现没有在 Cassandra 中混合的相同行为,以及 foldLeft 内部的复杂性。否则,这将是一个很好的问题! 【参考方案1】:

您将转换与动作混合在一起。 rdd2.flatMap 的关闭在工作人员上执行,而rdd1.collect 是 Spark 术语中的“动作”,并将数据返回给驱动程序。因此,非正式地说,当您尝试对其进行平面映射时,您可以说数据不存在。 (我对内部结构的了解还不够——还无法确定确切的根本原因)

如果您想对两个 RDD 进行分布式操作,您应该使用其中一种连接函数(join、leftOuterJoin、rightOuterJoin、cogroup)来连接它们。

例如

val mappedRdd1 = rdd1.map(x=> (x.id,x))
val mappedRdd2 = rdd2.map(x=> (x.customerId, x))

val joined = mappedRdd1.join(mappedRdd2)
joined.flatMap(...reporting logic..).collect

【讨论】:

感谢您的帮助,但不知何故,我没有获得可用于 RDD 的连接功能。但是,uning new PairRDDFunctions(rdd1).join(rdd2) 起作用了。 您应该import org.apache.spark.SparkContext._ 并将您的原始rdd 映射到pairRDD。连接是在键上完成的,它是 PairRDD 中元组的第一个元素。使用我上面给出的示例:val mappedRdd1 = rdd1.map(x=> (x.id,x)) 在您的数据模型上进行自然 PK 就可以了。【参考方案2】:

您可以在应用程序中对 RDD 进行操作。但是您不能在执行器(工作节点)中对 RDD 进行操作。执行者不能发出命令来驱动集群。 flatMap 中的代码在执行器上运行。

在第一种情况下,您尝试在 executor 中对 RDD 进行操作。我认为你会得到一个NotSerializableException,因为你甚至不能将 RDD 对象发送给执行者。在第二种情况下,您将 RDD 内容拉取到应用程序,然后将这个简单的List 发送给执行程序。 (Lambda 捕获会自动序列化。)

【讨论】:

以上是关于使用两个 RDD apache spark的主要内容,如果未能解决你的问题,请参考以下文章

使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

如何在 Spark 中将两个 RDD[string] 合并在一起?

使用 Apache Spark / Spark SQL 加入文件

Spark之RDD算子-转换算子

spark 源码分析之一 -- RDD的四种依赖关系

Scala - 基于 Spark 中的键合并两个 RDD