修改 Spark RDD foreach 中的集合

Posted

技术标签:

【中文标题】修改 Spark RDD foreach 中的集合【英文标题】:Modify collection inside a Spark RDD foreach 【发布时间】:2022-01-21 23:02:23 【问题描述】:

我正在尝试在迭代 RDD 的元素时向地图添加元素。我没有收到任何错误,但没有进行修改。

直接添加或迭代其他集合都可以正常工作:

scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()

scala> myMap("test1")="test1"

scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)

scala> List("test2", "test3").foreach(w => myMap(w) = w)

scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

但是当我尝试从 RDD 中做同样的事情时:

scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)

scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

我已经尝试像在 foreach 之前一样打印地图的内容,以确保变量相同,并且打印正确:

fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...

我还在 foreach 代码中打印了地图的修改元素,它打印为已修改,但是当操作完成时,地图似乎未修改。

scala> fromFile.foreach(w => myMap(w) = w; println(myMap(w)))
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

将 RDD 转换为数组(收集)也可以正常工作:

fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)

这是上下文问题吗?我是否正在访问正在其他地方修改的数据副本?

【问题讨论】:

【参考方案1】:

在 Spark 集群(不是单台机器)上运行时会更清晰。 RDD 现在分布在多台机器上。当你调用foreach 时,你告诉每台机器如何处理它拥有的RDD 片段。如果您引用任何局部变量(如myMap),它们会被序列化并发送到机器,因此它们可以使用它。但是什么都没有回来。所以您的myMap 的原始副本不受影响。

我认为这回答了您的问题,但显然您正在尝试完成某些事情,而您将无法以这种方式到达那里。请随时在此处或在单独的问题中解释您要做什么,我会尽力提供帮助。

【讨论】:

它确实回答了我的问题,不用担心我想要完成什么,我只是发现这是一个有趣的案例,我没有解释。我现在知道了,谢谢! 是的,正如 Daniel 指出的那样,你不能改变状态,palako 有点错过了函数式编程的要点。您不应该处于变异状态,因为那样您就无法并行化。通过以不改变状态的方式设计代码,您的代码可以免费并行化,您可以使用 Spark 和 Scalding 等框架在集群中分发。 我认为没有单一的通用解决方法,您只需要在没有这种事情的情况下解决问题。这几乎总是可能的。例如,假设您想将 RDD 的一些元素放在一个集合中,有点像问题中的样子。那么解决方案是使用RDD.filter 而不是RDD.foreach,这样你就可以得到另一个包含你想要的元素的RDD。然后你可以通过 Spark 进一步处理它们,或者使用collect 将它们提取到驱动程序并在本地处理它们。

以上是关于修改 Spark RDD foreach 中的集合的主要内容,如果未能解决你的问题,请参考以下文章

Spark——RDD算子

spark算子 分为3大类

Spark RDD

Spark入门02

Spark02

Spark之RDD编程