在顶部/使用 Spark 保存和加载 JSON 和 scala 的对象
Posted
技术标签:
【中文标题】在顶部/使用 Spark 保存和加载 JSON 和 scala 的对象【英文标题】:Save and load JSON and scala's objects on-top/with Spark 【发布时间】:2022-01-13 15:10:55 【问题描述】:我在使用 spark 读取和写入文件到“远程”文件系统(例如 hadoop)时遇到问题。
内容
-
我在本地做了什么?
我想在“远程”上做什么?
1。我在本地做了什么?
就目前而言,我在本地使用 spark - 向我的设备读取和写入文件,如下所示:
Spark-Session 初始化:
val spark: SparkSession = Try(
SparkSession.builder()
.master("local[*]")
.appName("app")
.getOrCreate()) match
case Success(session)=>session
case Failure(exception)=> throw new Exception(s"Failed initializing spark, due to: $exception.getMessage")
在本地保存/写入,然后加载/读取:
(Json 文件)
val content = ""a": 10, "b": [], "c": "x": "1", "z": , "x": "2", "z": " // dummy JSON as string
val fileName = "full_path/sample.json"
// ... verify directory exists and create it if not ...
// write sample.json with the content above:
new PrintWriter(fileName)
write(content)
close()
// Read & Operate on it:
val jsonAsBufferedSource = Source.fromFile(fileName)
(Scala 的案例类)
case class Dummy(string: String, i: Int) extends Serializable
val content = Dummy("42 is the best number", 42) // Dummy instance
val fileName = "full_path/sample.dummy" // 'dummy' is the serialized saved-object name.
// ... verify directory exists and create it if not ...
// Write it:
val output = new ObjectOutputStream(new FileOutputStream(fileName))
output.writeObject(content)
output.close()
// Read:
val input = new ObjectInputStream(new FileInputStream(fileName))
val dummyObject = input.readObject.asInstanceOf[Dummy]
input.close()
// Operate:
dummyObject.i // 42
2。我想在“远程”上做什么?
我希望能够使用 spark 在 HDFS、S3 或任何其他可用的“远程”文件系统上读取/写入,就像我在本地所做的那样。
我的问题主要是:
Spark 配置:应该更改什么以及如何更改? [大师等..] 使用 Spark: 如何像在本地一样保存和加载可序列化对象? 如何保存 Json 字符串,并将其作为 BufferedSource 加载?一般来说 - 我想让自己在本地/远程使用我的应用程序的相同“内部接口”工作。
感谢您的阅读!
编辑
我希望我的应用程序在测试和调试时将文件保存/读取到磁盘并在我的计算机磁盘上工作。我希望它在生产时使用远程文件系统保存/读取。 是否可以使用相同的火花方法?使用什么火花配置?
欧伦
【问题讨论】:
请编辑问题以将其限制为具有足够详细信息的特定问题,以确定适当的答案。 【参考方案1】:不确定我是否理解这个问题。 Spark 与 file:// 和 hdfs:// 或 s3a:// 前缀相同。错误的是 Source.fromFile 和 PrintWriter
您需要重写函数以使用正确的 Spark 方法,因为 Spark 旨在在集群中运行,而不是与一台机器隔离(称为驱动程序)
// read all JSON files in a folder
val df = spark.read.json("file:///path/to/full_path/")
// write the dataframe to HDFS folder
df.write.format("json").save("hdfs://namenode.fqdn:port/hdfs/path/")
当然,您可以序列化一个类,“本地”写入文件(deploy-mode=cluster
时将是“远程”),然后上传那个,但这看起来不像你在这里做。而不是这样做,您将parellelize
序列化对象的Seq
。
使用 json4s
而不是 ObjectOutputStream 从案例类中获取 JSON。
【讨论】:
谢谢!我会尽量说清楚 - 我希望我的应用程序能够远程工作(在集群中,可以是 hdfs/s3a 或其他),并且在我的设备上没有集群的情况下工作,用于“家庭使用”。直到现在 - 我使用上述方法作为“本地 [*]”与我的主人一起工作,没有使用 spark 来读/写(但只是在一些 DF 上操作)。有什么方法可以使用相同的 spark 方法在我的设备(本地)和远程集群上读取/写入文件 - 仅向系统提供不同的 spark 配置?再次感谢! 如上所述,那些“本地方法”不是 Spark 方法,必须更改 是否甚至可以在本地保存/加载文件(json、序列化案例类),并以与远程工作相同的方式对它们进行操作?我首先转向使用这些方法,因为 spark“不想”将文件保存在设备上 - 所以我只想以两种方式实现它(用于“本地”使用和“远程”)。跨度> 再次,使用file://
前缀对 Spark 执行器的本地文件进行操作
谢谢@OneCricketeer。只要确保 - 使用 file:// 将允许我在我的磁盘上保存和加载 csv/json 文件?我希望我的应用在调试/测试时在磁盘上运行,并且在生产中的集群之上运行。以上是关于在顶部/使用 Spark 保存和加载 JSON 和 scala 的对象的主要内容,如果未能解决你的问题,请参考以下文章
在 Spark DataFrame 上保存到 JSON 并重新加载,模式列序列发生变化
使用 spark 展平嵌套的 json 文档并加载到 Elasticsearch