从数据帧中激发 MLLib Kmeans,然后再返回
Posted
技术标签:
【中文标题】从数据帧中激发 MLLib Kmeans,然后再返回【英文标题】:Spark MLLib Kmeans from dataframe, and back again 【发布时间】:2015-10-05 11:33:47 【问题描述】:我的目标是使用 Spark (1.3.1) MLLib 将 kmeans 聚类算法应用于非常大的数据集。我已经使用 Spark 中的 hiveContext 调用了 HDFS 中的数据,并且最终希望以这种方式将其放回原处 - 以这种格式
|I.D |cluster |
===================
|546 |2 |
|6534 |4 |
|236 |5 |
|875 |2 |
我运行了以下代码,其中“数据”是双精度数据帧,第一列的 ID。
val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
这运行成功,我现在被困在如上所述的数据框中将集群映射回它们各自的 ID。我可以将其转换为数据框:
sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()
但这就是我所知道的。 This post 是在正确的轨道上,this post 我想我也在问一个类似的问题。
我怀疑需要labeledPoint 库。任何cmets,答案将不胜感激,干杯。
编辑:刚刚在 Spark 用户列表中找到 this,看起来很有希望
【问题讨论】:
【参考方案1】:我了解您希望最后获得 DataFrame。我看到了两种可能的解决方案。我会说在它们之间进行选择是品味问题。
从 RDD 创建列
很容易得到RDD形式的id和cluster对:
val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
然后你从那个创建DataFrame
val idCluster = idClusterRDD.toDF("id", "cluster")
之所以有效,是因为 map 不会改变 RDD 中数据的顺序,这就是为什么您可以使用预测结果压缩 id。
使用 UDF(用户定义函数)
第二种方法涉及使用clusters.predict
方法作为UDF:
val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int =
bcClusters.value.predict(Vectors.dense(x, y))
sqlContext.udf.register("predict", predict _)
现在我们可以使用它来为数据添加预测:
val idCluster = data.selectExpr("id", "predict(x, y) as cluster")
请记住,Spark API 不允许取消注册 UDF。这意味着闭包数据将保存在内存中。
错误/不理想的解决方案
它在分布式设置中不起作用。编辑:实际上它会起作用,我被 implementation of predict for RDD 弄糊涂了,它使用广播。
sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()
toArray
收集驱动程序中的所有数据。这意味着在分布式模式下,您会将集群 ID 复制到一个节点中。
【讨论】:
【参考方案2】:根据您的代码,我假设:
data
是一个包含三列的 DataFrame(label: Double
、x1: Double
和 x2: Double
)
您希望KMeans.predict
使用x1
和x2
来进行集群分配closestCluster: Int
结果数据框的格式应为 (label: Double
, closestCluster: Int
)
这是一个简单的示例应用程序,其中包含一些遵循假定架构的玩具数据:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.functions.col, udf
case class DataRow(label: Double, x1: Double, x2: Double)
val data = sqlContext.createDataFrame(sc.parallelize(Seq(
DataRow(3, 1, 2),
DataRow(5, 3, 4),
DataRow(7, 5, 6),
DataRow(6, 0, 0)
)))
val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
val t = udf (x1: Double, x2: Double) => clusters.predict(Vectors.dense(x1, x2))
val result = data.select(col("label"), t(col("x1"), col("x2")))
重要的部分是最后两行。
创建一个 UDF(用户定义函数),可以直接应用于 Dataframe 列(在本例中为 x1
和 x2
两列)。
选择label
列以及应用于x1
和x2
列的UDF。由于 UDF 将预测 closestCluster
,因此在此之后 result
将是一个由 (label
, closestCluster
) 组成的 Dataframe
【讨论】:
【参考方案3】:让我知道此代码是否适合您:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.clustering._
val rows = data.rdd.map(r => (r.getDouble(1),r.getDouble(2))).cache()
val vectors = rows.map(r => Vectors.dense(r._1, r._2))
val kMeansModel = KMeans.train(vectors, 3, 20)
val predictions = rows.mapr => (r._1, kMeansModel.predict(Vectors.dense(r._1, r._2)))
val df = predictions.toDF("id", "cluster")
df.show
【讨论】:
【参考方案4】:我正在使用 pySpark 做类似的事情。我猜您可以直接将其翻译为 Scala,因为没有特定于 python 的内容。 myPointsWithID 是我的 RDD,每个点都有一个 ID,点表示为值数组。
# Get an RDD of only the vectors representing the points to be clustered
points = myPointsWithID.map(lambda (id, point): point)
clusters = KMeans.train(points,
100,
maxIterations=100,
runs=50,
initializationMode='random')
# For each point in the original RDD, replace the point with the
# ID of the cluster the point belongs to.
clustersBC = sc.broadcast(clusters)
pointClusters = myPointsWithID.map(lambda (id, point): (id, clustersBC.value.predict(point)))
【讨论】:
以上是关于从数据帧中激发 MLLib Kmeans,然后再返回的主要内容,如果未能解决你的问题,请参考以下文章
Spark Mllib kmeans 示例,使用数据框而不是 textFile
Apache Spark MLLib - 使用 IDF-TF 向量运行 KMeans - Java 堆空间