值 toDF 不是 org.apache.spark.rdd.RDD 的成员

Posted

技术标签:

【中文标题】值 toDF 不是 org.apache.spark.rdd.RDD 的成员【英文标题】:value toDF is not a member of org.apache.spark.rdd.RDD 【发布时间】:2016-03-17 09:00:22 【问题描述】:

我在其他 SO 帖子中读到过这个问题,但我仍然不知道我做错了什么。原则上,添加这两行:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

应该已经成功了,但错误仍然存​​在

这是我的 build.sbt:

name := "PickACustomer"

version := "1.0"

scalaVersion := "2.11.7"


libraryDependencies ++= Seq("com.databricks" %% "spark-avro" % "2.0.1",
"org.apache.spark" %% "spark-sql" % "1.6.0",
"org.apache.spark" %% "spark-core" % "1.6.0")

我的 scala 代码是:

import scala.collection.mutable.Map
import scala.collection.immutable.Vector

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._


    object Foo

    def reshuffle_rdd(rawText: RDD[String]): RDD[Map[String, (Vector[(Double, Double, String)], Map[String, Double])]]  = ...

    def do_prediction(shuffled:RDD[Map[String, (Vector[(Double, Double, String)], Map[String, Double])]], prediction:(Vector[(Double, Double, String)] => Map[String, Double]) ) : RDD[Map[String, Double]] = ...

    def get_match_rate_from_results(results : RDD[Map[String, Double]]) : Map[String, Double]  = ...


    def retrieve_duid(element: Map[String,(Vector[(Double, Double, String)], Map[String,Double])]): Double = ...




    def main(args: Array[String])
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
        if (!conf.getOption("spark.master").isDefined) conf.setMaster("local")

        val sc = new SparkContext(conf)

        //This should do the trick
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._

        val PATH_FILE = "/mnt/fast_export_file_clean.csv"
        val rawText = sc.textFile(PATH_FILE)
        val shuffled = reshuffle_rdd(rawText)

        // PREDICT AS A FUNCTION OF THE LAST SEEN UID
        val results = do_prediction(shuffled.filter(x => retrieve_duid(x) > 1) , predict_as_last_uid)
        results.cache()

        case class Summary(ismatch: Double, t_to_last:Double, nflips:Double,d_uid: Double, truth:Double, guess:Double)

        val summary = results.map(x => Summary(x("match"), x("t_to_last"), x("nflips"), x("d_uid"), x("truth"), x("guess")))


        //PROBLEMATIC LINE
        val sum_df = summary.toDF()

    
    

我总是得到:

值 toDF 不是 org.apache.spark.rdd.RDD[摘要] 的成员

现在有点丢了。有什么想法吗?

【问题讨论】:

你能至少输入你的值并给我们所用方法的定义吗? 其他对象中的案例类可能仍然会导致序列化问题。 实际上,您可以在 main 内部使用以下内容来总结您的代码: val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val summary: RDD[Summary ] = ??? case class Summary(imatch: Double, t_to_last: Double, nflips: Double, d_uid: Double, truth: Double, guess: Double) val sum_df = summary.toDF() 很遗憾,我无法重现错误...... 不需要整个代码。有时在没有实际应用值的情况下定义要使用的值就足够了,但在这种情况下,键入很重要。尝试运行sbt update 【参考方案1】:

将您的案例类移到main之外:

object Foo 

  case class Summary(ismatch: Double, t_to_last:Double, nflips:Double,d_uid: Double, truth:Double, guess:Double)

  def main(args: Array[String])
    ...
  


它的作用域使 Spark 无法处理 Summary 的架构的自动派生。仅供参考,我实际上得到了与 sbt 不同的错误:

没有可用于摘要的 TypeTag

【讨论】:

以防万一这有助于某人 - 将类移到对象之外会导致类似的问题。该类必须在 foo 内部和 main 外部【参考方案2】:

太棒了。救我一命

将您的案例类移到 main 之外:

object Foo 

    case class Summary(ismatch: Double, t_to_last:Double, nflips:Double,d_uid: Double, truth:Double, guess:Double)

    def main(args: Array[String])
...
    

【讨论】:

【参考方案3】:

将您的案例类移到函数体之外。然后使用 import spark.implicits._.

【讨论】:

以上是关于值 toDF 不是 org.apache.spark.rdd.RDD 的成员的主要内容,如果未能解决你的问题,请参考以下文章

错误:值 toDF 不是 org.apache.spark.rdd.RDD[org.apache.kafka.clients.consumer.ConsumerRecord[String,String

值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员

toDF 不是 Seq 的成员,toDS 不是 Seq 的成员

在 Spark 中创建数据帧时出错

spark.implicits._ 导入了啥?

Spark2.1.0单机模式无法启动master的问题