使用两个 RDD apache spark
Posted
技术标签:
【中文标题】使用两个 RDD apache spark【英文标题】:working with two RDDs apache spark 【发布时间】:2014-06-12 21:38:38 【问题描述】:我正在使用 calliope 即 spark 插件与 cassandra 连接。我创建了 2 个看起来像
的 RDDclass A
val persistLevel = org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
val cas1 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 1")
val sc1 = new SparkContext("local", "name it any thing ")
var rdd1 = sc.cql3Cassandra[SCALACLASS_1](cas1)
var rddResult1 = rdd1.persist(persistLevel)
class B
val cas2 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 2")
var rdd2 = sc1.cql3Cassandra[SCALACLASS_2](cas2)
var rddResult2 = rdd2.persist(persistLevel)
不知何故,使用其他 2 个创建新 RDD 的代码库无法正常工作。有没有可能我们不能同时使用 2 个 RDD 进行迭代?
这是无效的代码 sn-p -
case class Report(id: Long, anotherId: Long)
var reportRDD = rddResult2.flatMap(f =>
val buf = List[Report]()
**rddResult1.collect().toList**.foldLeft(buf)((k, v) =>
val buf1 = new ListBuffer[Report]
buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) =>
buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
)
)
)
如果我替换粗体并为它初始化一个 val -
val collection = rddResult1.collect().toList
var reportRDD = rddResult2.flatMap(f =>
val buf = List[Report]()
**collection**.foldLeft(buf)((k, v) =>
val buf1 = new ListBuffer[Report]
buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) =>
buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
)
)
)
有效,有什么解释吗?
【问题讨论】:
你遇到了什么错误? 这是一大堆代码,看起来与问题无关。介意清理一下吗?你会发现没有在 Cassandra 中混合的相同行为,以及foldLeft
内部的复杂性。否则,这将是一个很好的问题!
【参考方案1】:
您将转换与动作混合在一起。 rdd2.flatMap
的关闭在工作人员上执行,而rdd1.collect
是 Spark 术语中的“动作”,并将数据返回给驱动程序。因此,非正式地说,当您尝试对其进行平面映射时,您可以说数据不存在。 (我对内部结构的了解还不够——还无法确定确切的根本原因)
如果您想对两个 RDD 进行分布式操作,您应该使用其中一种连接函数(join、leftOuterJoin、rightOuterJoin、cogroup)来连接它们。
例如
val mappedRdd1 = rdd1.map(x=> (x.id,x))
val mappedRdd2 = rdd2.map(x=> (x.customerId, x))
val joined = mappedRdd1.join(mappedRdd2)
joined.flatMap(...reporting logic..).collect
【讨论】:
感谢您的帮助,但不知何故,我没有获得可用于 RDD 的连接功能。但是,uningnew PairRDDFunctions(rdd1).join(rdd2)
起作用了。
您应该import org.apache.spark.SparkContext._
并将您的原始rdd 映射到pairRDD。连接是在键上完成的,它是 PairRDD 中元组的第一个元素。使用我上面给出的示例:val mappedRdd1 = rdd1.map(x=> (x.id,x))
在您的数据模型上进行自然 PK 就可以了。【参考方案2】:
您可以在应用程序中对 RDD 进行操作。但是您不能在执行器(工作节点)中对 RDD 进行操作。执行者不能发出命令来驱动集群。 flatMap
中的代码在执行器上运行。
在第一种情况下,您尝试在 executor 中对 RDD 进行操作。我认为你会得到一个NotSerializableException
,因为你甚至不能将 RDD 对象发送给执行者。在第二种情况下,您将 RDD 内容拉取到应用程序,然后将这个简单的List
发送给执行程序。 (Lambda 捕获会自动序列化。)
【讨论】:
以上是关于使用两个 RDD apache spark的主要内容,如果未能解决你的问题,请参考以下文章
使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集
如何在 Spark 中将两个 RDD[string] 合并在一起?