为啥 local[*] 不使用我机器中的所有可用内核? [复制]
Posted
技术标签:
【中文标题】为啥 local[*] 不使用我机器中的所有可用内核? [复制]【英文标题】:Why doesn't local[*] use all the available cores in my machine? [duplicate]为什么 local[*] 不使用我机器中的所有可用内核? [复制] 【发布时间】:2017-05-29 11:11:54 【问题描述】:如果这个问题已经得到解答,我深表歉意。我确实查看了存档,但没有找到特定于我的问题的答案。
我是 Spark 的新手。我正在尝试在我的 MacOS Sierra 机器中使用 spark-2.1.1 在本地并行运行附加的简单示例。由于我有 4 个核心,并且有 4 个任务,每个任务需要 10 秒,我希望总共花费 10 秒以上。
我看到每个任务都需要预期的时间。但在我看来,只有 2 个执行线程。我期待的是4。从代码中可以看到,每个元组的值就是对应任务的执行时间。
insight086:pyspark lquesada$ 更多输出/part-00000
(u'1', 10.000892877578735)
(u'3', 10.000878095626831)
insight086:pyspark lquesada$ 更多输出/part-00001
(u'2', 10.000869989395142)
(u'4', 10.000877857208252)
此外,这花费的总时间远远超过 20 秒:
total_time 33.2253439426
提前感谢您的帮助!
干杯, 路易斯
输入文件:
1
2
3
4
脚本:
from pyspark import SparkContext
import time
def mymap(word):
start = time.time()
time.sleep(10)
et=time.time()-start
return (word, et)
def main():
start = time.time()
sc = SparkContext(appName='SparkWordCount')
input_file = sc.textFile('/Users/lquesada/Dropbox/hadoop/pyspark/input.txt')
counts = input_file.flatMap(lambda line: line.split()) \
.map(mymap) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile('/Users/lquesada/Dropbox/hadoop/pyspark/output')
sc.stop()
print 'total_time',time.time()-start
if __name__ == '__main__':
main()
【问题讨论】:
这个数据集太小了,不可能用它来证明任何东西...... 我的实际问题是使用的核心数。但是,我确实很欣赏您指出“在扩展核心数量时性能数字不一致”这一事实,因为这肯定与我对开销的担忧有关。 【参考方案1】:这就是为什么Divide and conquer algorithms 有它们的阈值,可以完全使用它们。将分布添加到 Spark 中的混合(具有并行性)中,您就有了相当多的机器来进行如此小的计算。在这个 4 元素数据集上,您根本没有利用 Spark 的优势。
假设随着数据集越来越大,时间将收敛于您的预期。
此外,读取本地数据集时的分区数最多为 2 个,因此如果没有repartitioning,您只能使用 2 个内核。
repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 返回一个新的 RDD 正好有 numPartitions 个分区。
可以提高或降低此 RDD 中的并行度。在内部,这使用 shuffle 重新分配数据。
如果您要减少此 RDD 中的分区数,请考虑使用 coalesce,这样可以避免执行 shuffle。
local[*]
表示使用与您的计算机一样多的内核(参见SparkContext
中LOCAL_N_REGEX 的案例):
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
val threadCount = if (threads == "*") localCpuCount else threads.toInt
这只是提示默认使用多少个分区,但不阻止 Spark 上升或下降。它主要取决于 Spark 应用的优化,以最终为您的分布式计算提供最佳执行计划。 Spark 为您做了很多,抽象级别越高,优化越多(参见 Spark SQL 优化器中的batches)。
【讨论】:
以上是关于为啥 local[*] 不使用我机器中的所有可用内核? [复制]的主要内容,如果未能解决你的问题,请参考以下文章
《机器学习实战》 书上的Apriori算法 内循环为啥只执行了一次
为啥我看不到 XCode 中的类中存在的所有可用方法或属性?