RDD toDF() :错误行为

Posted

技术标签:

【中文标题】RDD toDF() :错误行为【英文标题】:RDD toDF() : Erroneous Behavior 【发布时间】:2016-10-27 07:21:05 【问题描述】:

我构建了一个 SparkStreaming 应用程序,该应用程序从 Kafka 队列中获取内容,并打算在经过一些预处理和结构化后将数据放入 mysql 表中。

我在 SparkStreamingContext 上调用“foreachRDD”方法。我面临的问题是,当我在 RDD 上调用 saveAsTextFile 和使用格式(“csv”)的 DataFrame 的 write 方法之间存在数据丢失。我似乎无法确定为什么会发生这种情况。

val ssc = new StreamingContext(spark.sparkContext, Seconds(60))
ssc.checkpoint("checkpoint")

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    stream.foreachRDD 
     rdd => 
     rdd.saveAsTextFile("/Users/jarvis/rdds/"+new SimpleDateFormat("hh-mm-ss-dd-MM-yyyy").format(new Date)+"_rdd")

     import spark.implicits._

      val messagesDF = rdd.map(_.split("\t")).map( w =>  Record ( w(0), autoTag( w(1),w(4) ) , w(2), w(3), w(4), w(5).substring(w(5).lastIndexOf("http://")), w(6).split("\n")(0) )).toDF("recordTS","tag","channel_url","title","description","link","pub_TS")

      messagesDF.write.format("csv").save(dumpPath+new SimpleDateFormat("hh-mm-ss-dd-MM-yyyy").format(new Date)+"_DF")
      
    

    ssc.start()
    ssc.awaitTermination()

存在数据丢失,即许多行无法从 RDD 进入 DataFrame。 还有复制:确实到达 Dataframe 的许多行被复制了很多次。

【问题讨论】:

你可以做的是,先将rdd转换成df,然后你可以将相同的DF写入csvtext文件。要将 df 保存到文本文件,请尝试 df.write.text("file path") 另外,您可以在写入 CSV 和文本文件之前cache DF。 【参考方案1】:

发现错误。实际上对摄取的数据格式存在错误的理解。

预期的数据是“\t\t\t...”,因此应该在“\n”处拆分行。

但是实际数据是: "\t\t\t...\n\t\t\t...\n"

因此 rdd.map(...) 操作需要另一个映射来在每个“\n”处进行拆分

【讨论】:

以上是关于RDD toDF() :错误行为的主要内容,如果未能解决你的问题,请参考以下文章

toDF() 不处理 RDD

如何使用 toDF() 将自定义 Java 类对象的 RDD 转换为 DataFrame?

值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员

将 rdd 转换为数据框:AttributeError: 'RDD' object has no attribute 'toDF' using PySpark

值 toDF 不是成员 org.apache.spark.rdd.RDD

值 toDF 不是 org.apache.spark.rdd.RDD 的成员