从数据帧中激发 MLLib Kmeans,然后再返回

Posted

技术标签:

【中文标题】从数据帧中激发 MLLib Kmeans,然后再返回【英文标题】:Spark MLLib Kmeans from dataframe, and back again 【发布时间】:2015-10-05 11:33:47 【问题描述】:

我的目标是使用 Spark (1.3.1) MLLib 将 kmeans 聚类算法应用于非常大的数据集。我已经使用 Spark 中的 hiveContext 调用了 HDFS 中的数据,并且最终希望以这种方式将其放回原处 - 以这种格式

    |I.D     |cluster |
    ===================
    |546     |2       |
    |6534    |4       |
    |236     |5       |
    |875     |2       |

我运行了以下代码,其中“数据”是双精度数据帧,第一列的 ID。

    val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
    val clusters = KMeans.train(parsedData, 3, 20)

这运行成功,我现在被困在如上所述的数据框中将集群映射回它们各自的 ID。我可以将其转换为数据框:

    sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

但这就是我所知道的。 This post 是在正确的轨道上,this post 我想我也在问一个类似的问题。

我怀疑需要labeledPoint 库。任何cmets,答案将不胜感激,干杯。

编辑:刚刚在 Spark 用户列表中找到 this,看起来很有希望

【问题讨论】:

【参考方案1】:

我了解您希望最后获得 DataFrame。我看到了两种可能的解决方案。我会说在它们之间进行选择是品味问题。

从 RDD 创建列

很容易得到RDD形式的id和cluster对:

val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)

然后你从那个创建DataFrame

val idCluster = idClusterRDD.toDF("id", "cluster")

之所以有效,是因为 map 不会改变 RDD 中数据的顺序,这就是为什么您可以使用预测结果压缩 id。

使用 UDF(用户定义函数)

第二种方法涉及使用clusters.predict方法作为UDF:

val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = 
    bcClusters.value.predict(Vectors.dense(x, y))

sqlContext.udf.register("predict", predict _)

现在我们可以使用它来为数据添加预测:

val idCluster = data.selectExpr("id", "predict(x, y) as cluster")

请记住,Spark API 不允许取消注册 UDF。这意味着闭包数据将保存在内存中。

错误/不理想的解决方案

使用 clusters.predict 而不广播

它在分布式设置中不起作用。编辑:实际上它会起作用,我被 implementation of predict for RDD 弄糊涂了,它使用广播。

sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

toArray 收集驱动程序中的所有数据。这意味着在分布式模式下,您会将集群 ID 复制到一个节点中。

【讨论】:

【参考方案2】:

根据您的代码,我假设:

data 是一个包含三列的 DataFrame(label: Doublex1: Doublex2: Double) 您希望KMeans.predict 使用x1x2 来进行集群分配closestCluster: Int 结果数据框的格式应为 (label: Double, closestCluster: Int)

这是一个简单的示例应用程序,其中包含一些遵循假定架构的玩具数据:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.functions.col, udf

case class DataRow(label: Double, x1: Double, x2: Double)
val data = sqlContext.createDataFrame(sc.parallelize(Seq(
    DataRow(3, 1, 2),
    DataRow(5, 3, 4),
    DataRow(7, 5, 6),
    DataRow(6, 0, 0)
)))

val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
val t = udf  (x1: Double, x2: Double) => clusters.predict(Vectors.dense(x1, x2)) 
val result = data.select(col("label"), t(col("x1"), col("x2")))

重要的部分是最后两行。

    创建一个 UDF(用户定义函数),可以直接应用于 Dataframe 列(在本例中为 x1x2 两列)。

    选择label 列以及应用于x1x2 列的UDF。由于 UDF 将预测 closestCluster,因此在此之后 result 将是一个由 (label, closestCluster) 组成的 Dataframe

【讨论】:

【参考方案3】:

让我知道此代码是否适合您:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering._

val rows = data.rdd.map(r => (r.getDouble(1),r.getDouble(2))).cache()
val vectors = rows.map(r => Vectors.dense(r._1, r._2))
val kMeansModel = KMeans.train(vectors, 3, 20)
val predictions = rows.mapr => (r._1, kMeansModel.predict(Vectors.dense(r._1, r._2)))
val df = predictions.toDF("id", "cluster")
df.show

【讨论】:

【参考方案4】:

我正在使用 pySpark 做类似的事情。我猜您可以直接将其翻译为 Scala,因为没有特定于 python 的内容。 myPointsWithID 是我的 RDD,每个点都有一个 ID,点表示为值数组。

# Get an RDD of only the vectors representing the points to be clustered
points = myPointsWithID.map(lambda (id, point): point)
clusters = KMeans.train(points, 
                        100, 
                        maxIterations=100, 
                        runs=50,
                        initializationMode='random')

# For each point in the original RDD, replace the point with the
# ID of the cluster the point belongs to. 
clustersBC = sc.broadcast(clusters)
pointClusters = myPointsWithID.map(lambda (id, point): (id, clustersBC.value.predict(point)))

【讨论】:

以上是关于从数据帧中激发 MLLib Kmeans,然后再返回的主要内容,如果未能解决你的问题,请参考以下文章

Spark Mllib kmeans 示例,使用数据框而不是 textFile

Apache Spark MLLib - 使用 IDF-TF 向量运行 KMeans - Java 堆空间

3 分钟学会调用 Apache Spark MLlib KMeans

spark.mllib源码阅读-聚类算法1-KMeans

spark.mllib源码阅读-聚类算法1-KMeans

为啥 Spark Mllib KMeans 算法非常慢?