RDD toDF() :错误行为
Posted
技术标签:
【中文标题】RDD toDF() :错误行为【英文标题】:RDD toDF() : Erroneous Behavior 【发布时间】:2016-10-27 07:21:05 【问题描述】:我构建了一个 SparkStreaming 应用程序,该应用程序从 Kafka 队列中获取内容,并打算在经过一些预处理和结构化后将数据放入 mysql 表中。
我在 SparkStreamingContext 上调用“foreachRDD”方法。我面临的问题是,当我在 RDD 上调用 saveAsTextFile 和使用格式(“csv”)的 DataFrame 的 write 方法之间存在数据丢失。我似乎无法确定为什么会发生这种情况。
val ssc = new StreamingContext(spark.sparkContext, Seconds(60))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
stream.foreachRDD
rdd =>
rdd.saveAsTextFile("/Users/jarvis/rdds/"+new SimpleDateFormat("hh-mm-ss-dd-MM-yyyy").format(new Date)+"_rdd")
import spark.implicits._
val messagesDF = rdd.map(_.split("\t")).map( w => Record ( w(0), autoTag( w(1),w(4) ) , w(2), w(3), w(4), w(5).substring(w(5).lastIndexOf("http://")), w(6).split("\n")(0) )).toDF("recordTS","tag","channel_url","title","description","link","pub_TS")
messagesDF.write.format("csv").save(dumpPath+new SimpleDateFormat("hh-mm-ss-dd-MM-yyyy").format(new Date)+"_DF")
ssc.start()
ssc.awaitTermination()
存在数据丢失,即许多行无法从 RDD 进入 DataFrame。 还有复制:确实到达 Dataframe 的许多行被复制了很多次。
【问题讨论】:
你可以做的是,先将rdd
转换成df
,然后你可以将相同的DF写入csv
和text
文件。要将 df 保存到文本文件,请尝试 df.write.text("file path")
另外,您可以在写入 CSV 和文本文件之前cache
DF。
【参考方案1】:
发现错误。实际上对摄取的数据格式存在错误的理解。
预期的数据是“\t\t\t...”,因此应该在“\n”处拆分行。
但是实际数据是: "\t\t\t...\n\t\t\t...\n"
因此 rdd.map(...) 操作需要另一个映射来在每个“\n”处进行拆分
【讨论】:
以上是关于RDD toDF() :错误行为的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 toDF() 将自定义 Java 类对象的 RDD 转换为 DataFrame?
值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员
将 rdd 转换为数据框:AttributeError: 'RDD' object has no attribute 'toDF' using PySpark