Spark-Scala:另存为 csv 文件(RDD)[重复]

Posted

技术标签:

【中文标题】Spark-Scala:另存为 csv 文件(RDD)[重复]【英文标题】:Spark-Scala: save as csv file (RDD) [duplicate] 【发布时间】:2017-10-07 04:15:38 【问题描述】:

我曾尝试使用 Apache Spark 流式传输 twitter 数据,我想将流式传输数据保存为 csv 文件,但我不能 如何修复我的代码以在 csv 中获取它

我使用 RDD。

这是我的主要代码:

val ssc = new StreamingContext(conf, Seconds(3600))
val stream = TwitterUtils.createStream(ssc, None, filters)

val tweets = stream.map(t => 
  Map(
    // This is for tweet
    "text" -> t.getText,
    "retweet_count" -> t.getRetweetCount,
    "favorited" -> t.isFavorited,
    "truncated" -> t.isTruncated,
    "id_str" -> t.getId,
    "in_reply_to_screen_name" -> t.getInReplyToScreenName,
    "source" -> t.getSource,
    "retweeted" -> t.isRetweetedByMe,
    "created_at" -> t.getCreatedAt,
    "in_reply_to_status_id_str" -> t.getInReplyToStatusId,
    "in_reply_to_user_id_str" -> t.getInReplyToUserId,

    // This is for tweet's user
    "listed_count" -> t.getUser.getListedCount,
    "verified" -> t.getUser.isVerified,
    "location" -> t.getUser.getLocation,
    "user_id_str" -> t.getUser.getId,
    "description" -> t.getUser.getDescription,
    "geo_enabled" -> t.getUser.isGeoEnabled,
    "user_created_at" -> t.getUser.getCreatedAt,
    "statuses_count" -> t.getUser.getStatusesCount,
    "followers_count" -> t.getUser.getFollowersCount,
    "favorites_count" -> t.getUser.getFavouritesCount,
    "protected" -> t.getUser.isProtected,
    "user_url" -> t.getUser.getURL,
    "name" -> t.getUser.getName,
    "time_zone" -> t.getUser.getTimeZone,
    "user_lang" -> t.getUser.getLang,
    "utc_offset" -> t.getUser.getUtcOffset,
    "friends_count" -> t.getUser.getFriendsCount,
    "screen_name" -> t.getUser.getScreenName
  )
)

tweets.repartition(1).saveAsTextFiles("~/streaming/tweets")

【问题讨论】:

你有 RDD 还是 Dataset/frame。如果是后者,则有一种将 CSV 作为格式选项的写入方法 @cricket_007 这是一个 RDD @user8371915 不,我使用的是 RDD 而不是 DF 【参考方案1】:

您需要将 RDD[Map[String, String]] 的推文转换为数据框以保存为 CSV。原因很简单,RDD 没有模式。而 csv 格式具有特定的模式。因此,您必须将 RDD 转换为具有架构的数据框。

有几种方法可以做到这一点。一种方法是使用案例类而不是将数据放入地图中。

 case class(text:String, retweetCount:Int ...)

现在用适当的参数来实例化案例类,而不是 Map(...)。

最后使用 spark 隐式转换将推文转换为数据帧

import spark.implicits._
tweets.toDF.write.csv(...) // saves as CSV

或者,您可以使用here 给出的解决方案将地图转换为数据框

【讨论】:

推文地图内部或之后的这些变化? 我尝试使用该方法将 Map 转换为数据框,它可以工作,但如何分配 twitter 信息(getText、getRetweetCount、...)? 终于成功了,谢谢

以上是关于Spark-Scala:另存为 csv 文件(RDD)[重复]的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Excel 文件另存为 CSV? [复制]

Applescript Excel 2016 另存为 CSV

在javascript中另存为csv文件

excel另存为csv时,身份证号后4位全部变成0了, 怎么解决

从PCAP文件中提取时间并另存为CSV文件

将Excel文件另存为CSV