为啥 local[*] 不使用我机器中的所有可用内核? [复制]

Posted

技术标签:

【中文标题】为啥 local[*] 不使用我机器中的所有可用内核? [复制]【英文标题】:Why doesn't local[*] use all the available cores in my machine? [duplicate]为什么 local[*] 不使用我机器中的所有可用内核? [复制] 【发布时间】:2017-05-29 11:11:54 【问题描述】:

如果这个问题已经得到解答,我深表歉意。我确实查看了存档,但没有找到特定于我的问题的答案。

我是 Spark 的新手。我正在尝试在我的 MacOS Sierra 机器中使用 spark-2.1.1 在本地并行运行附加的简单示例。由于我有 4 个核心,并且有 4 个任务,每个任务需要 10 秒,我希望总共花费 10 秒以上。

我看到每个任务都需要预期的时间。但在我看来,只有 2 个执行线程。我期待的是4。从代码中可以看到,每个元组的值就是对应任务的执行时间。

insight086:pyspark lquesada$ 更多输出/part-00000

(u'1', 10.000892877578735)
(u'3', 10.000878095626831)

insight086:pyspark lquesada$ 更多输出/part-00001

(u'2', 10.000869989395142)
(u'4', 10.000877857208252)

此外,这花费的总时间远远超过 20 秒:

total_time 33.2253439426

提前感谢您的帮助!

干杯, 路易斯

输入文件:

1
2
3
4

脚本:

from pyspark import SparkContext
import time

def mymap(word):
    start = time.time()
    time.sleep(10)
    et=time.time()-start
    return (word, et)

def main():
    start = time.time()
    sc = SparkContext(appName='SparkWordCount')

    input_file = sc.textFile('/Users/lquesada/Dropbox/hadoop/pyspark/input.txt')
    counts = input_file.flatMap(lambda line: line.split()) \
                     .map(mymap) \
                     .reduceByKey(lambda a, b: a + b)
    counts.saveAsTextFile('/Users/lquesada/Dropbox/hadoop/pyspark/output')

    sc.stop()
    print 'total_time',time.time()-start

if __name__ == '__main__':
   main()

【问题讨论】:

这个数据集太小了,不可能用它来证明任何东西...... 我的实际问题是使用的核心数。但是,我确实很欣赏您指出“在扩展核心数量时性能数字不一致”这一事实,因为这肯定与我对开销的担忧有关。 【参考方案1】:

这就是为什么Divide and conquer algorithms 有它们的阈值,可以完全使用它们。将分布添加到 Spark 中的混合(具有并行性)中,您就有了相当多的机器来进行如此小的计算。在这个 4 元素数据集上,您根本没有利用 Spark 的优势。

假设随着数据集越来越大,时间将收敛于您的预期。

此外,读取本地数据集时的分区数最多为 2 个,因此如果没有repartitioning,您只能使用 2 个内核。

repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 返回一个新的 RDD 正好有 numPartitions 个分区。

可以提高或降低此 RDD 中的并行度。在内部,这使用 shuffle 重新分配数据。

如果您要减少此 RDD 中的分区数,请考虑使用 coalesce,这样可以避免执行 shuffle。


local[*] 表示使用与您的计算机一样多的内核(参见SparkContext 中LOCAL_N_REGEX 的案例):

def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
val threadCount = if (threads == "*") localCpuCount else threads.toInt

这只是提示默认使用多少个分区,但不阻止 Spark 上升或下降。它主要取决于 Spark 应用的优化,以最终为您的分布式计算提供最佳执行计划。 Spark 为您做了很多,抽象级别越高,优化越多(参见 Spark SQL 优化器中的batches)。

【讨论】:

以上是关于为啥 local[*] 不使用我机器中的所有可用内核? [复制]的主要内容,如果未能解决你的问题,请参考以下文章

Shell编程手册

《机器学习实战》 书上的Apriori算法 内循环为啥只执行了一次

为啥我看不到 XCode 中的类中存在的所有可用方法或属性?

为啥mysql不回收ibd文件中的可用空间?

为啥设置方法中的“route.params.id”不可用/未定义?

为啥在使用 XMPP 时多次发送聊天时会出现“服务不可用”?