Apache Spark - MLlib - K-Means 输入格式

Posted

技术标签:

【中文标题】Apache Spark - MLlib - K-Means 输入格式【英文标题】:Apache Spark - MLlib - K-Means Input format 【发布时间】:2014-09-30 20:35:26 【问题描述】:

我想执行 K-Means 任务,但未能训练模型并在获得结果指标之前被踢出 Sparks scala shell。我不确定输入格式是问题还是其他问题。我使用 Spark 1.0.0,我的输入纺织品 (400MB) 如下所示:

ID,Category,PruductSize,PurchaseAMount
86252,3711,15.4,4.18
86252,3504,28,1.25
86252,3703,10.75,8.85
86252,3703,10.5,5.55
86252,2201,64,2.79
12262064,7203,32,8.49
etc.

我不确定是否可以使用前两个,因为在 MLlib 示例文件中只使用浮点数。所以我也尝试了最后两个:

16 2.49
64 3.29
56 1
etc.

我在这两种情况下的错误代码都在这里:

scala> import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.clustering.KMeans

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> 

scala> // Load and parse the data

scala> val data = sc.textFile("data/outkmeanssm.txt")
14/08/07 16:15:37 INFO MemoryStore: ensureFreeSpace(35456) called with curMem=0, maxMem=318111744
14/08/07 16:15:37 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 34.6 KB, free 303.3 MB)
data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14

scala> val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at <console>:16

scala> 

scala> // Cluster the data into two classes using KMeans

scala> val numClusters = 2
numClusters: Int = 2

scala> val numIterations = 20
numIterations: Int = 20

scala> val clusters = KMeans.train(parsedData, numClusters, numIterations)
14/08/07 16:15:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/08/07 16:15:38 WARN LoadSnappy: Snappy native library not loaded
14/08/07 16:15:38 INFO FileInputFormat: Total input paths to process : 1
14/08/07 16:15:38 INFO SparkContext: Starting job: takeSample at KMeans.scala:260
14/08/07 16:15:38 INFO DAGScheduler: Got job 0 (takeSample at KMeans.scala:260) with 7 output partitions (allowLocal=false)
14/08/07 16:15:38 INFO DAGScheduler: Final stage: Stage 0(takeSample at KMeans.scala:260)
14/08/07 16:15:38 INFO DAGScheduler: Parents of final stage: List()
14/08/07 16:15:38 INFO DAGScheduler: Missing parents: List()
14/08/07 16:15:38 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at map at KMeans.scala:123), which has no missing parents
14/08/07 16:15:39 INFO DAGScheduler: Submitting 7 missing tasks from Stage 0 (MappedRDD[6] at map at KMeans.scala:123)
14/08/07 16:15:39 INFO TaskSchedulerImpl: Adding task set 0.0 with 7 tasks
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:0 as 2221 bytes in 3 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:1 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:2 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:3 as TID 3 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:3 as 2221 bytes in 1 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:4 as TID 4 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:4 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:5 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:6 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO Executor: Running task ID 4
14/08/07 16:15:39 INFO Executor: Running task ID 1
14/08/07 16:15:39 INFO Executor: Running task ID 5
14/08/07 16:15:39 INFO Executor: Running task ID 6
14/08/07 16:15:39 INFO Executor: Running task ID 0
14/08/07 16:15:39 INFO Executor: Running task ID 3
14/08/07 16:15:39 INFO Executor: Running task ID 2
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:0+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:100663296+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:201326592+24305610
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:33554432+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:67108864+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:134217728+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:167772160+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_0 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:0+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_2 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:67108864+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_1 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:33554432+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_4 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:134217728+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_6 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:201326592+24305610
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_3 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:100663296+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_5 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:167772160+33554432
14/08/07 16:16:53 ERROR Executor: Exception in task ID 5
java.lang.OutOfMemoryError: Java heap space
    at scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:99)
    at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:47)
    at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:83)
    at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
    at org.apache.spark.rdd.ZippedRDD.compute(ZippedRDD.scala:66)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)
14/08/07 16:16:59 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-5,5,main]
java.lang.OutOfMemoryError: Java heap space
    at scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:99)
    at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:47)
    at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:83)
    at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
    at org.apache.spark.rdd.ZippedRDD.compute(ZippedRDD.scala:66)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)
14/08/07 16:17:00 WARN TaskSetManager: Lost TID 5 (task 0.0:5)
Chairs-MacBook-Pro:spark-1.0.0 admin$ 
Chairs-MacBook-Pro:spark-1.0.0 admin$ // Evaluate clustering by computing Within Set Sum of Squared Errors
-bash: //: is a directory
Chairs-MacBook-Pro:spark-1.0.0 admin$ val WSSSE = clusters.computeCost(parsedData)
-bash: syntax error near unexpected token `('
Chairs-MacBook-Pro:spark-1.0.0 admin$ println("Within Set Sum of Squared Errors = " + WSSSE)

我错过了什么?

【问题讨论】:

【参考方案1】:

当您尝试将更多数据添加到内存中的堆空间区域时,将触发您面临的“java.lang.OutOfMemoryError: Java heap space”错误,但此数据的大小超出了 JVM 可以容纳的大小在 Java 堆空间中。

这是因为部署在 Java 虚拟机上的应用程序只允许使用有限的内存量。此限制在应用程序启动期间指定。为了让事情变得更复杂,Java 内存被分成两个不同的区域,其中之一称为堆。而且你已经用尽了堆。

第一个解决方案应该很明显 - 当您用完特定资源时,您应该增加此类资源的可用性。在我们的例子中:当您的应用程序没有足够的 Java 堆空间内存来正常运行时,修复它就像更改 JVM 启动配置并添加(或增加,如果存在)以下内容一样简单:

-Xmx1024m

【讨论】:

以上是关于Apache Spark - MLlib - K-Means 输入格式的主要内容,如果未能解决你的问题,请参考以下文章

Scala 中用于 mllib 的 java.lang.NumberFormatException

Apache Spark Mllib

Apache Mahout 和 Apache Spark 的 MLlib 有啥区别?

Apache Spark MLlib:如何从 PMML 导入模型

Spark MLlib / K-Means 直觉

使用 Apache Spark MLlib 的朴素贝叶斯