为啥这两种 Spark RDD 生成方式具有不同的数据局部性?
Posted
技术标签:
【中文标题】为啥这两种 Spark RDD 生成方式具有不同的数据局部性?【英文标题】:Why these two Spark RDDs generation ways have different data localities?为什么这两种 Spark RDD 生成方式具有不同的数据局部性? 【发布时间】:2016-12-12 15:15:47 【问题描述】:我在本地机器上运行两种不同的 RDD 生成方式,第一种方式是:
rdd = sc.range(0, 100).sortBy(lambda x: x, numPartitions=10)
rdd.collect()
第二种方式是:
rdd = sc.parallelize(xrange(100), 10)
rdd.collect()
但在我的 Spark UI 中,它显示了不同的数据位置,我不知道为什么。下面是第一种方式的结果,它显示 Locality Level(第 5 列)是 ANY
第二种方式的结果显示Locality Level是Process_Local:
我从https://spark.apache.org/docs/latest/tuning.html 中读到,Process_Local Level 通常比 Any Level 处理速度更快。
是不是因为 sortBy 操作会引起 shuffle 影响数据的局部性?谁能给我一个更清楚的解释?
【问题讨论】:
【参考方案1】:你是对的。
在第一个 sn-p 中,您首先创建一个并行化集合,这意味着您的驱动程序告诉每个工作人员创建集合的某些部分。那么,对于排序每个worker节点需要访问其他节点上的数据,数据需要打乱,数据局部性丢失。
第二个代码 sn-p 实际上甚至不是分布式作业。
由于 Spark 使用惰性求值,所以在调用具体化结果之前什么都不做,在本例中使用 collect 方法。您的第二次计算中的步骤是有效的
-
将列表类型的对象从驱动分发到工作节点
在每个工作节点上什么都不做
从工作人员那里收集分布式对象以在驱动程序上创建列表类型的对象。
Spark 足够聪明,可以意识到即使调用了并行化,也没有理由分发列表。由于数据驻留在同一个节点上,计算在同一个节点上完成,数据局部性显然得到了保留。
编辑: 有关 Spark 如何排序的一些附加信息。
Spark 在底层 MapReduce 模型(编程模型,而不是 Hadoop 实现)上运行,并且 sort 被实现为单个 map 和 reduce。从概念上讲,在映射阶段的每个节点上,特定节点操作的集合部分被排序并写入内存。然后,reducers 从映射器中提取相关数据,合并结果并创建迭代器。
因此,对于您的示例,假设您有一个映射器,它按排序顺序将数字 21-34 写入内存。假设同一个节点有一个负责数字 31-40 的减速器。 reducer 从驱动程序获取相关数据所在的信息。数字 31-34 是从同一个节点中提取的,数据只需要在线程之间传输。然而,其他号码可以在集群中的任意节点上,并且需要通过网络传输。一旦 reducer 从节点中提取了所有相关数据,shuffle 阶段就结束了。 reducer 现在合并结果(就像在 mergesort 中一样)并在集合的已排序部分上创建一个迭代器。
http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
【讨论】:
感谢您的友好回复。我还是不明白,你的意思是第二个代码不是分布式作业,Spark 不会分发这个列表。那么如果我在第二个代码中添加一个转换,然后再次调用 collect() 会怎样。转换会并行执行吗?从我的测试来看,我认为答案是肯定的,但是数据局部性仍然是最差的,我也不知道为什么。 在第二个代码中,如果在并行化集合之后调用一个简单的地图,例如,locality 将是 PROCESS_LOCAL。 更正:它通常是 PROCESS_LOCAL 因为你不能保证任何事情。 Spark Web UI 中的位置级别是实际的数据位置级别,例如,如果您有简单的映射,例如为 rdd 中的每个元素添加相同的数字,如果集群中有空闲的执行器,则数据可能会通过网络连接到空闲执行器,并且数据位置在 Spark Web UI 中不会是 PROCESS_LOCAL。我的意见是,您不必太担心 Spark 网络用户界面关于位置的内容。以上是关于为啥这两种 Spark RDD 生成方式具有不同的数据局部性?的主要内容,如果未能解决你的问题,请参考以下文章