Scala - 将 RDD[String] 转换为地图
Posted
技术标签:
【中文标题】Scala - 将 RDD[String] 转换为地图【英文标题】:Scala - Turning RDD[String] into a Map 【发布时间】:2022-01-12 00:43:20 【问题描述】:我有一个非常大的文件,其中包含我想遍历的单个 JSON,使用 Jackson 库将每个 JSON 转换为 Map:
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.module.scala.DefaultScalaModule
import com.fasterxml.module.scala.ScalaObjectMapper
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.register(DefaultScalaModule)
val lines = sc.textFile(fileName)
在单个 JSON 字符串上,我可以毫无问题地执行:
mapper.readValue[Map[String, Object]](JSONString)
获取我的地图。
但是,如果我通过像这样迭代 RDD[String] 来尝试以下操作,则会收到以下错误:
lines.foreach(line=> mapper.readValue[Map[String, Object]])
org.apache.Spark.SparkException: Task not serializable
我可以做 lines.take(10000) 左右,然后处理它,但是这个文件太大了,我不能一次“获取”或“收集”整个文件,我希望能够使用跨所有不同大小文件的相同解决方案。
字符串变成 Map 后,我需要对其执行函数并写入字符串,因此任何允许我在不超出分配内存的情况下执行此操作的解决方案都会有所帮助。谢谢!
【问题讨论】:
为什么不直接使用 Spark 的内置功能来读取JSON
文件?
文本文件是单独的 json 附加在一起。如果我以 sc.textFile(fileName) 的形式加入,我可以将每个单独的 JSON 文件作为字符串进行迭代并执行类似 println 的操作,并且我想继续将其视为字符串。当它存在一个 rdd 时,我无法将它变成地图,但如果我将它变成带有 take 或 collect 的数组,我可以。我的问题是它太大了,我不知道为什么我可以在 Array[String] 而不是 rdd[String] 上执行功能
“我不知道为什么我可以在 Array[String] 而不是 rdd[String] 上执行功能” 因为谁知道使用反射是什么黑魔法jackson 这样做使任务不可序列化(在集群中运行任务至关重要);另外,无论如何Map[String, Object]
是一个极其不安全的数据结构。 - 同样,如果您的文件每行是 JSON
,那么您可以直接使用 spark 读取它。
我确实有办法将其放入更安全的数据结构中。但是你能告诉我如何用开箱即用的火花来阅读它吗?我不想要数据框,无论是字符串还是地图。谢谢
DataFrame
有什么问题?或者将其转换为正确代表您的数据的Dataset[SomeCaseClass]
怎么样? - 无论如何,如果您坚持走这条路,那么我建议您查看其他 JSON 库,例如 circe,它不应该有这个可序列化的问题。
【参考方案1】:
通过以下方法解决了这个问题:
import scala.util.parsing.json._
val myMap = JSON.parseFull(jsonString).get.asInstanceOf[Map[String, Object]]
以上内容适用于 RDD[String]
【讨论】:
scala.util.parsing.json._
在我 5 年前开始使用 Scala 之前就已被弃用。您正在使用一个非常旧的 Scala 版本,因此也是一个非常旧的 Spark 版本......您真的应该升级并使用最佳实践。以上是关于Scala - 将 RDD[String] 转换为地图的主要内容,如果未能解决你的问题,请参考以下文章
在 Spark Scala 中将 RDD[(String, String, String)] 转换为 RDD[(String, (String, String))]