类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD
Posted
技术标签:
【中文标题】类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD【英文标题】:type mismatch; found : org.apache.spark.sql.DataFrame required: org.apache.spark.rdd.RDD 【发布时间】:2019-04-30 23:16:09 【问题描述】:我是 scala 和 mllib 的新手,我遇到了以下错误。请让我知道是否有人能够解决类似的问题。
import org.apache.spark.sql.SparkSession
import org.apache.spark.mllib.clustering.KMeans, KMeansModel
import org.apache.spark.mllib.linalg.Vectors
.
.
.
val conf = new SparkConf().setMaster("local").setAppName("SampleApp")
val sContext = new SparkContext(conf)
val sc = SparkSession.builder().master("local").appName("SampleApp").getOrCreate()
val sampleData = sc.read.json("input/sampleData.json")
val clusters = KMeans.train(sampleData, 10, 10)
val WSSSE = clusters.computeCost(sampleData)
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sContext, "target/org/apache/spark/KMeansExample/KMeansModel")
上面这行给出了一个错误:
type mismatch; found : org.apache.spark.sql.DataFrame (which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
所以我尝试了:
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans().setK(20)
val model = kmeans.fit(sampleData)
val predictions = model.transform(sampleData)
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
这给出了错误:
Exception in thread "main" java.lang.IllegalArgumentException: Field "features" does not exist.
Available fields: address, attributes, business_id
at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)
at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.types.StructType.apply(StructType.scala:266)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnTypes(SchemaUtils.scala:58)
at org.apache.spark.ml.util.SchemaUtils$.validateVectorCompatibleColumn(SchemaUtils.scala:119)
at org.apache.spark.ml.clustering.KMeansParams$class.validateAndTransformSchema(KMeans.scala:96)
at org.apache.spark.ml.clustering.KMeans.validateAndTransformSchema(KMeans.scala:285)
at org.apache.spark.ml.clustering.KMeans.transformSchema(KMeans.scala:382)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:341)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:340)
at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:340)
我一直指的是https://spark.apache.org/docs/latest/ml-clustering.html和https://spark.apache.org/docs/latest/mllib-clustering.html
编辑
使用 setFeaturesCol()
import org.apache.spark.ml.clustering.KMeans
val assembler = new VectorAssembler()
.setInputCols(Array("is_open", "review_count", "stars"))
.setOutputCol("features")
val output = assembler.transform(sampleData).select("features")
val kmeans = new KMeans().setK(20).setFeaturesCol("features")
val model = kmeans.fit(output)
val predictions = model.transform(sampleData)
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")
这仍然会产生不同的错误:
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.Utils$.getSimpleName(Ljava/lang/Class;)Ljava/lang/String;
at org.apache.spark.ml.util.Instrumentation.logPipelineStage(Instrumentation.scala:52)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:350)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:340)
at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:340)
谢谢。
【问题讨论】:
请阅读What's the difference between Spark ML and MLLIB packages并选择一个或另一个,具有适当的分布式数据结构。 使用较新的 ml 库(如第二次尝试)。对于KMeans
,您需要指定要使用哪一列作为输入(默认为“功能”,因此功能不存在的错误)。使用setFeaturesCol()
执行此操作。
【参考方案1】:
使用 scala 管道
val assembler = new VectorAssembler()
.setInputCols(Array("feature1",feature2","feature3"))
.setOutputCol("assembled_features")
val scaler = new StandardScaler()
.setInputCol("assembled_features")
.setOutputCol("features")
.setWithStd(true)
.setWithMean(false)
val kmeans = new KMeans().setK(2).setSeed(1L)
// create the pipeline
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler, kmeans))
// Fit the model
val clussterModel = pipeline.fit(train)
【讨论】:
以上是关于类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD的主要内容,如果未能解决你的问题,请参考以下文章
在 spark 数据框中运行 UDF 时,不支持获取 org.apache.spark.sql.Column 类型的架构
具有不匹配模式的 Spark 合并数据帧,无需额外的磁盘 IO
加入 RDD leftOuterJoin - 错误:类型不匹配
Spark 安装 - 错误:无法找到或加载主类 org.apache.spark.launcher.Main