为啥 foreach 没有给驱动程序带来任何东西?
Posted
技术标签:
【中文标题】为啥 foreach 没有给驱动程序带来任何东西?【英文标题】:Why does foreach not bring anything to the driver program?为什么 foreach 没有给驱动程序带来任何东西? 【发布时间】:2015-03-02 07:32:20 【问题描述】:我在 spark shell 中编写了这个程序
val array = sc.parallelize(List(1, 2, 3, 4))
array.foreach(x => println(x))
这会打印一些调试语句,但不会打印实际数字。
下面的代码可以正常工作
for(num <- array.take(4))
println(num)
我知道take
是一个动作,因此会导致 spark 触发惰性计算。
但是 foreach 应该以同样的方式工作......为什么 foreach
没有从 spark 中带回任何东西并开始进行实际处理(退出惰性模式)
如何使 rdd 上的 foreach 工作?
【问题讨论】:
【参考方案1】:Spark 中的RDD.foreach
方法在集群上运行,因此每个包含这些记录的工作人员都在运行foreach
中的操作。 IE。您的代码正在运行,但它们正在 Spark 工作人员标准输出上打印出来,而不是在驱动程序/您的 shell 会话中。如果您查看 Spark 工作人员的输出 (stdout),您会看到这些打印到控制台。
您可以通过转到为每个正在运行的执行程序运行的 web gui 来查看工作程序的标准输出。示例 URL 为 http://workerIp:workerPort/logPage/?appId=app-20150303023103-0043&executorId=1&logType=stdout
在本例中,Spark 选择将 RDD 的所有记录放在同一个分区中。
如果您考虑一下,这是有道理的 - 查看 foreach
的函数签名 - 它不会返回任何内容。
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit
这确实是 scala 中 foreach
的目的 - 它用于副作用。
当您收集记录时,您会将它们带回驱动程序,因此逻辑上收集/获取操作只是在 Spark 驱动程序中的 Scala 集合上运行 - 您可以看到日志输出,因为 spark 驱动程序/spark shell 正在打印到会话中的标准输出。
foreach 的用例可能看起来并不明显,举个例子 - 如果您想要对 RDD 中的每条记录执行一些外部行为,例如调用 REST api,您可以在 foreach 中执行此操作,然后在每个 Spark 工作人员中执行此操作将使用该值向 API 服务器提交调用。如果 foreach 确实带回了记录,那么您很容易在驱动程序/shell 进程中耗尽内存。这样可以避免这些问题,并且可以对集群中 RDD 中的所有项目产生副作用。
如果您想查看我使用的 RDD 中的内容;
array.collect.foreach(println)
//Instead of collect, use take(...) or takeSample(...) if the RDD is large
【讨论】:
当您需要更新函数内部的累加器并希望操作保证它只会更新一次时,Foreach 非常有用。据我所知,这是 Spark 中唯一允许我在 RDD 上干净地运行函数的操作。 (Per the Spark Docs对于只在action内部执行的累加器更新,Spark保证每个任务对累加器的更新只会应用一次,即重新启动的任务不会更新值) 那么,foreach 真的会在工作节点中执行吗?在工作节点中运行是 spark right 的意图。我见过人们提到的地方,“不要在集群环境中使用 RDD.foreach()”。这是真的吗? @HariRam 我不确定“不要在集群环境中使用 RDD.foreach()”的上下文。如果操作成本很高,您可能需要考虑使用foreachpartition
,它在每个分区上运行并且可能效率更高(例如,如果插入数据库,则批处理记录)。集群环境中的大型 RDD 您需要考虑 foreach
中函数的成本,因为它为 RDD 中的每个元素运行该函数,如果执行 API 调用、日志记录、数据库操作等,您可能会有许多工作节点过载该资源(DDoS 风格的洪水、填满日志磁盘空间等)【参考方案2】:
您可以使用 RDD.toLocalIterator() 将数据带到驱动程序(一次一个 RDD 分区):
val array = sc.parallelize(List(1, 2, 3, 4))
for(rec <- array.toLocalIterator) println(rec)
另见
Spark: Best practice for retrieving big data from RDD to local machine this blog post关于toLocalIterator【讨论】:
以上是关于为啥 foreach 没有给驱动程序带来任何东西?的主要内容,如果未能解决你的问题,请参考以下文章