类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD

Posted

技术标签:

【中文标题】类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD【英文标题】:type mismatch; found : org.apache.spark.sql.DataFrame required: org.apache.spark.rdd.RDD 【发布时间】:2019-04-30 23:16:09 【问题描述】:

我是 scala 和 mllib 的新手,我遇到了以下错误。请让我知道是否有人能够解决类似的问题。

import org.apache.spark.sql.SparkSession
import org.apache.spark.mllib.clustering.KMeans, KMeansModel
import org.apache.spark.mllib.linalg.Vectors
.
.
.
val conf = new SparkConf().setMaster("local").setAppName("SampleApp")
val sContext = new SparkContext(conf)
val sc = SparkSession.builder().master("local").appName("SampleApp").getOrCreate()
val sampleData = sc.read.json("input/sampleData.json")
val clusters = KMeans.train(sampleData, 10, 10)
val WSSSE = clusters.computeCost(sampleData)
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sContext, "target/org/apache/spark/KMeansExample/KMeansModel")

上面这行给出了一个错误:

type mismatch; found : org.apache.spark.sql.DataFrame (which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]

所以我尝试了:

import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans().setK(20)
val model = kmeans.fit(sampleData)
val predictions = model.transform(sampleData)
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)

这给出了错误:

Exception in thread "main" java.lang.IllegalArgumentException: Field "features" does not exist.
Available fields: address, attributes, business_id
at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)
at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.types.StructType.apply(StructType.scala:266)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnTypes(SchemaUtils.scala:58)
at org.apache.spark.ml.util.SchemaUtils$.validateVectorCompatibleColumn(SchemaUtils.scala:119)
at org.apache.spark.ml.clustering.KMeansParams$class.validateAndTransformSchema(KMeans.scala:96)
at org.apache.spark.ml.clustering.KMeans.validateAndTransformSchema(KMeans.scala:285)
at org.apache.spark.ml.clustering.KMeans.transformSchema(KMeans.scala:382)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:341)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:340)
at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:340)

我一直指的是https://spark.apache.org/docs/latest/ml-clustering.html和https://spark.apache.org/docs/latest/mllib-clustering.html

编辑

使用 setFeaturesCol()

import org.apache.spark.ml.clustering.KMeans
val assembler = new VectorAssembler()
  .setInputCols(Array("is_open", "review_count", "stars"))
  .setOutputCol("features")
val output = assembler.transform(sampleData).select("features")
val kmeans = new KMeans().setK(20).setFeaturesCol("features")
val model = kmeans.fit(output)
val predictions = model.transform(sampleData)
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

这仍然会产生不同的错误:

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.Utils$.getSimpleName(Ljava/lang/Class;)Ljava/lang/String;
at org.apache.spark.ml.util.Instrumentation.logPipelineStage(Instrumentation.scala:52)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:350)
at org.apache.spark.ml.clustering.KMeans$$anonfun$fit$1.apply(KMeans.scala:340)
at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:340)

谢谢。

【问题讨论】:

请阅读What's the difference between Spark ML and MLLIB packages并选择一个或另一个,具有适当的分布式数据结构。 使用较新的 ml 库(如第二次尝试)。对于KMeans,您需要指定要使用哪一列作为输入(默认为“功能”,因此功能不存在的错误)。使用setFeaturesCol() 执行此操作。 【参考方案1】:

使用 scala 管道

val assembler = new VectorAssembler() 
                    .setInputCols(Array("feature1",feature2","feature3")) 
                    .setOutputCol("assembled_features")
 val scaler = new StandardScaler() 
      .setInputCol("assembled_features") 
      .setOutputCol("features") 
      .setWithStd(true) 
      .setWithMean(false)
    
 val kmeans = new KMeans().setK(2).setSeed(1L)
    // create the pipeline
    val pipeline = new Pipeline() 
      .setStages(Array(assembler, scaler, kmeans))
    
    // Fit the model 
    val clussterModel = pipeline.fit(train)

【讨论】:

以上是关于类型不匹配;找到:org.apache.spark.sql.DataFrame 需要:org.apache.spark.rdd.RDD的主要内容,如果未能解决你的问题,请参考以下文章

在 spark 数据框中运行 UDF 时,不支持获取 org.apache.spark.sql.Column 类型的架构

具有不匹配模式的 Spark 合并数据帧,无需额外的磁盘 IO

加入 RDD leftOuterJoin - 错误:类型不匹配

Spark 安装 - 错误:无法找到或加载主类 org.apache.spark.launcher.Main

org.apache.spark.sql.SchemaRDD 类型无法解析

列特征必须是 org.apache.spark.ml.linalg.VectorUDT 类型