将 kmeans 模型注册为 UDF

Posted

技术标签:

【中文标题】将 kmeans 模型注册为 UDF【英文标题】:Registering kmeans model as UDF 【发布时间】:2015-06-29 09:23:04 【问题描述】:

您好,我正在尝试使用 Spark kmeans 模型来预测集群数。但是当我注册它并在 SQL 中使用它时,它给了我一个

java.lang.reflect.InvocationTargetException

def findCluster(s:String):Int=
    model.predict(feautarize(s))

我正在使用下面的

%sql select findCluster((text)) from tweets

直接用也一样

findCluster("hello am vishnu")

输出 1

【问题讨论】:

【参考方案1】:

您提供的代码无法重现该问题。假设modelorg.apache.spark.mllib.clustering.KMeansModel这里是一步一步的解决方案

首先让我们导入所需的库并设置 RNG 种子:

import scala.util.Random
import org.apache.spark.mllib.clustering.KMeans, KMeansModel
import org.apache.spark.mllib.linalg.Vectors

Random.setSeed(0L)

生成随机训练集:

// Generate random training set
val trainData = sc.parallelize((1 to 1000).map  _ =>
    val off = if(Random.nextFloat > 0.5) 0.5 else -0.5
    Vectors.dense(Random.nextFloat + off, Random.nextFloat + off)
)

运行 KMeans

// Train KMeans with 2 clusters

val numClusters = 2
val numIterations = 20

val clusters = KMeans.train(trainData, numClusters, numIterations)

创建 UDF

// Create broadcast variable with model and prediction function 
val model = sc.broadcast(clusters)
def findCluster(v: org.apache.spark.mllib.linalg.Vector):Int=
    model.value.predict(v)


// Register UDF
sqlContext.udf.register("findCluster", findCluster _)

准备测试集

// Create test set
case class Coord(v: org.apache.spark.mllib.linalg.Vector)
val testData = sqlContext.createDataFrame(sc.parallelize((1 to 100).map  _ =>
    val off = if(Random.nextFloat > 0.5) 0.5 else -0.5
    Coord(Vectors.dense(Random.nextFloat + off, Random.nextFloat + off))
))

// Register test set df
testData.registerTempTable("testData")

// Check if it works
sqlContext.sql("SELECT findCluster(v) FROM testData").take(1)

结果:

res3: Array[org.apache.spark.sql.Row] = Array([1])

【讨论】:

嘿,谢谢,它现在可以工作了。错误在于我使用 Zeppelin 的方式。 很高兴听到这个消息。如果您提供一些解释作为单独的答案,以防将来有人遇到类似问题,这可能会很有用。

以上是关于将 kmeans 模型注册为 UDF的主要内容,如果未能解决你的问题,请参考以下文章

如何将Python算法模型注册成Spark UDF函数实现全景模型部署

如何将Python算法模型注册成Spark UDF函数实现全景模型部署

如何将Python算法模型注册成Spark UDF函数实现全景模型部署

是否可以将字符串注册为 UDF?

从 Scala 将 UDF 注册到 SqlContext 以在 PySpark 中使用

在 sparkSession 上注册两个同名 udf