为啥在具有 1 行的 DataFrame 上的 collect() 使用 2000 个执行器?
Posted
技术标签:
【中文标题】为啥在具有 1 行的 DataFrame 上的 collect() 使用 2000 个执行器?【英文标题】:Why does collect() on a DataFrame with 1 row use 2000 exectors?为什么在具有 1 行的 DataFrame 上的 collect() 使用 2000 个执行器? 【发布时间】:2016-06-21 00:40:32 【问题描述】:这是我能想到的最简单的 DataFrame。我正在使用 PySpark 1.6.1。
# one row of data
rows = [ (1, 2) ]
cols = [ "a", "b" ]
df = sqlContext.createDataFrame(rows, cols)
所以数据框完全适合内存,没有对任何文件的引用,对我来说看起来很微不足道。
然而,当我收集数据时,它使用了 2000 个执行器:
df.collect()
在收集期间,使用了 2000 个执行器:
[Stage 2:===================================================>(1985 + 15) / 2000]
然后是预期的输出:
[Row(a=1, b=2)]
为什么会这样? DataFrame不应该完全在驱动程序的内存中吗?
【问题讨论】:
【参考方案1】:所以我稍微研究了一下代码,试图弄清楚发生了什么。看来sqlContext.createDataFrame
确实没有做任何尝试根据数据设置合理的参数值。
为什么是 2000 个任务?
Spark 使用 2000 个任务,因为我的数据框有 2000 个分区。 (尽管分区数多于行数似乎是一派胡言。)
这可以通过以下方式看到:
>>> df.rdd.getNumPartitions()
2000
为什么 DataFrame 有 2000 个分区?
发生这种情况是因为 sqlContext.createDataFrame
最终使用默认分区数(在我的例子中为 2000),而不管数据的组织方式或它有多少行。
代码轨迹如下。
在sql/context.py
中,sqlContext.createDataFrame
函数调用(在本例中):
rdd, schema = self._createFromLocal(data, schema)
依次调用:
return self._sc.parallelize(data), schema
而sqlContext.parallelize
函数定义在context.py
:
numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism
不检查行数,无法从sqlContext.createDataFrame
指定切片数。
如何更改 DataFrame 的分区数?
使用DataFrame.coalesce
。
>>> smdf = df.coalesce(1)
>>> smdf.rdd.getNumPartitions()
1
>>> smdf.explain()
== Physical Plan ==
Coalesce 1
+- Scan ExistingRDD[a#0L,b#1L]
>>> smdf.collect()
[Row(a=1, b=2)]
【讨论】:
【参考方案2】:您可以配置执行器的数量。在很多情况下,spark 会采用尽可能多的 executor,而执行时间比限制少数 executor 时要差很多。
conf = SparkConf()
conf.set('spark.dynamicAllocation.enabled','true')
conf.set('spark.dynamicAllocation.maxExecutors','32')
【讨论】:
好的.. 但为什么这是它使用许多执行器的众多案例之一?没有分布式文件,DataFrame 适合内存。我本来希望它使用 0 个(或者可能是 1 个)执行器。 这取决于你的 sparkconf 而不是工作本身 嗯。 Spark 在资源管理方面并没有更智能,这让我有点震惊。我原以为它正在尝试处理最接近该分区存储位置的节点上的每个分布式数据分区。因此,没有分布式数据的情况将是微不足道的。我想我误解了 Spark 的经理是做什么的……以上是关于为啥在具有 1 行的 DataFrame 上的 collect() 使用 2000 个执行器?的主要内容,如果未能解决你的问题,请参考以下文章
为啥在具有一级索引的 MultiIndex 列的 pandas DataFrame 中表现不同?
为啥在另一个快照隔离事务中插入具有引用行的外键引用行的行会导致事务挂起?