Spark KMeans 无法处理大数据吗?

Posted

技术标签:

【中文标题】Spark KMeans 无法处理大数据吗?【英文标题】:Is Spark's KMeans unable to handle bigdata? 【发布时间】:2017-01-08 16:56:30 【问题描述】:

KMeans 的training 有几个参数,初始化模式默认为kmeans||。问题是它快速前进(不到 10 分钟)到前 13 个阶段,但随后完全挂起,没有产生错误!

Minimal Example 重现问题(如果我使用 1000 点或随机初始化它将成功):

from pyspark.context import SparkContext

from pyspark.mllib.clustering import KMeans
from pyspark.mllib.random import RandomRDDs


if __name__ == "__main__":
    sc = SparkContext(appName='kmeansMinimalExample')

    # same with 10000 points
    data = RandomRDDs.uniformVectorRDD(sc, 10000000, 64)
    C = KMeans.train(data, 8192,  maxIterations=10)    

    sc.stop()

作业什么都不做(它没有成功、失败或进展..),如下所示。 Executors 选项卡中没有活动/失败的任务。 Stdout 和 Stderr Logs 没有什么特别有趣的东西:

如果我使用k=81,而不是8192,它会成功:

注意takeSample()、should not be an issue的两次调用,因为在随机初始化的情况下被调用了两次。

那么,发生了什么? Spark 的 Kmeans 无法扩展吗?有人知道吗?你能复制吗?


如果是内存问题,I would get warnings and errors, as I had been before。

注意:placeybordeaux 的 cmets 是基于 client 模式 下作业的执行,其中驱动程序的配置无效,导致退出代码 143 等(请参阅编辑历史记录),而不是集群模式,在根本没有报告错误的地方,应用程序只是挂起


从 zero323:Why is Spark Mllib KMeans algorithm extremely slow? 相关,但我认为他见证了一些进展,而我的挂起,我确实发表了评论...

【问题讨论】:

退出代码 143 表示容器因超出为作业分配的内存而被杀死。每个容器为堆大小分配了多少 RAM?你能增加这个吗? 迈克尔 1.6.2,更新!你说的其他分类器是什么意思?我只关心kmeans。 @placeybordeaux 确实如此。我想知道,在驱动程序或执行程序中?或两者?但是,请注意,这些消息似乎来自已完成,而不是失败的阶段。但是,我会尝试增加它以查看会发生什么...如果您发现这个问题很有趣,请大家投票,以引起更多关注并解开谜团! :) 可能没有一个简单的答案,您有两个主要因素会影响需要多少内存:数据点的数量和数据的维度。你能减少其中一个并获得一致的结果吗?或者增加每台机器可用的 RAM。您可能还想尝试减少执行程序上的核心数量。如果由于 GC 开销限制而导致失败,那么用更少的内存执行更多任务将无济于事。 @placeybordeaux 我无法减少#poitns,但我确实执行了降维,64 维听起来已经太低了。那么,您建议我尝试哪种配置?我的问题中有标志.. :) @Michael 我确实更新了我的问题,检查一下! 【参考方案1】:

我认为“挂起”是因为您的执行者不断死亡。正如我在旁白中提到的,这段代码对我来说运行良好,在本地和集群上,在 Pyspark 和 Scala 中。但是,它需要的时间比它应该的要长得多。几乎所有时间都花在了 k-means||初始化。

我打开https://issues.apache.org/jira/browse/SPARK-17389 跟踪两项主要改进,其中一项您现在可以使用。编辑:真的,另见https://issues.apache.org/jira/browse/SPARK-11560

首先,有一些代码优化可以将初始化速度提高大约 13%。

但是大多数问题是它默认为 5 步 k-means|| init,当它似乎 2 几乎总是一样好时。您可以将初始化步骤设置为 2 以查看加速,尤其是在现在挂起的阶段。

在我的笔记本电脑上的(较小的)测试中,初始化时间从 5:54 变为 1:41,这两个变化主要是由于设置了初始化步骤。

【讨论】:

【参考方案2】:

如果您的 RDD 太大,collectAsMap 将尝试将 RDD 中的每个元素复制到单个驱动程序中,然后耗尽内存并崩溃。即使您已经对数据进行了分区,collectAsMap 也会将所有内容发送给驱动程序,并且您的作业会崩溃。 您可以通过调用 take 或 takeSample,或者过滤或采样您的 RDD 来确保返回的元素数量受到限制。 同样,除非您确定您的数据集大小足够小以适合内存,否则也要小心这些其他操作:

countByKey, 计数值, 收集

如果您确实需要 RDD 的这些值中的每一个并且数据太大而无法放入内存,您可以将 RDD 写入文件或将 RDD 导出到足够大的数据库以容纳所有数据。当您使用 API 时,我认为您无法做到这一点(可能重写所有代码?增加内存?)。我认为 runAlgorithm 方法中的这个 collectAsMap 在 Kmeans 中是一件非常糟糕的事情(https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html)...

【讨论】:

感谢卡罗尔的回答!

以上是关于Spark KMeans 无法处理大数据吗?的主要内容,如果未能解决你的问题,请参考以下文章

R语言可以处理大的数据吗

金融需要 hadoop,spark 等这些大数据分析工具吗?使用场景是怎样的

Spark记录-大数据简介

「大数据分析」寻找数据优势:Spark和Flink终极对决

您需要HADOOP来运行SPARK吗?

从数据帧中激发 MLLib Kmeans,然后再返回