在顶部/使用 Spark 保存和加载 JSON 和 scala 的对象

Posted

技术标签:

【中文标题】在顶部/使用 Spark 保存和加载 JSON 和 scala 的对象【英文标题】:Save and load JSON and scala's objects on-top/with Spark 【发布时间】:2022-01-13 15:10:55 【问题描述】:

我在使用 spark 读取和写入文件到“远程”文件系统(例如 hadoop)时遇到问题。

内容

    我在本地做了什么? 我想在“远程”上做什么?

1。我在本地做了什么?

就目前而言,我在本地使用 spark - 向我的设备读取和写入文件,如下所示:

Spark-Session 初始化

  val spark: SparkSession = Try(
    SparkSession.builder()
      .master("local[*]")
      .appName("app")
      .getOrCreate()) match 
    case Success(session)=>session
    case Failure(exception)=> throw new Exception(s"Failed initializing spark, due to: $exception.getMessage")
  

在本地保存/写入,然后加载/读取:

(Json 文件)

  val content = ""a": 10, "b": [], "c": "x": "1", "z": , "x": "2", "z": "  // dummy JSON as string
  val fileName = "full_path/sample.json"

  // ... verify directory exists and create it if not ...

  // write sample.json with the content above:
  new PrintWriter(fileName) 
    write(content)
    close()
  

  // Read & Operate on it:
  val jsonAsBufferedSource = Source.fromFile(fileName)

(Scala 的案例类)

  case class Dummy(string: String, i: Int) extends Serializable 
  val content = Dummy("42 is the best number", 42)       // Dummy instance
  val fileName = "full_path/sample.dummy"               // 'dummy' is the serialized saved-object name.
  
  // ... verify directory exists and create it if not ...

  // Write it:
  val output = new ObjectOutputStream(new FileOutputStream(fileName))
  output.writeObject(content)
  output.close()

  // Read:
  val input = new ObjectInputStream(new FileInputStream(fileName))
  val dummyObject = input.readObject.asInstanceOf[Dummy]
  input.close()

  // Operate:
  dummyObject.i   // 42

2。我想在“远程”上做什么?

我希望能够使用 spark 在 HDFS、S3 或任何其他可用的“远程”文件系统上读取/写入,就像我在本地所做的那样。

我的问题主要是:

Spark 配置:应该更改什么以及如何更改? [大师等..] 使用 Spark: 如何像在本地一样保存和加载可序列化对象? 如何保存 Json 字符串,并将其作为 BufferedSource 加载?

一般来说 - 我想让自己在本地/远程使用我的应用程序的相同“内部接口”工作。

感谢您的阅读!

编辑

我希望我的应用程序在测试和调试时将文件保存/读取到磁盘并在我的计算机磁盘上工作。我希望它在生产时使用远程文件系统保存/读取。 是否可以使用相同的火花方法?使用什么火花配置?

欧伦

【问题讨论】:

请编辑问题以将其限制为具有足够详细信息的特定问题,以确定适当的答案。 【参考方案1】:

不确定我是否理解这个问题。 Spark 与 file:// 和 hdfs:// 或 s3a:// 前缀相同。错误的是 Source.fromFile 和 PrintWriter

您需要重写函数以使用正确的 Spark 方法,因为 Spark 旨在在集群中运行,而不是与一台机器隔离(称为驱动程序)

// read all JSON files in a folder
val df = spark.read.json("file:///path/to/full_path/")

// write the dataframe to HDFS folder
df.write.format("json").save("hdfs://namenode.fqdn:port/hdfs/path/")

当然,您可以序列化一个类,“本地”写入文件(deploy-mode=cluster 时将是“远程”),然后上传那个,但这看起来不像你在这里做。而不是这样做,您将parellelize 序列化对象的Seq

使用 json4s 而不是 ObjectOutputStream 从案例类中获取 JSON。

【讨论】:

谢谢!我会尽量说清楚 - 我希望我的应用程序能够远程工作(在集群中,可以是 hdfs/s3a 或其他),并且在我的设备上没有集群的情况下工作,用于“家庭使用”。直到现在 - 我使用上述方法作为“本地 [*]”与我的主人一起工作,没有使用 spark 来读/写(但只是在一些 DF 上操作)。有什么方法可以使用相同的 spark 方法在我的设备(本地)和远程集群上读取/写入文件 - 仅向系统提供不同的 spark 配置?再次感谢! 如上所述,那些“本地方法”不是 Spark 方法,必须更改 是否甚至可以在本地保存/加载文件(json、序列化案例类),并以与远程工作相同的方式对它们进行操作?我首先转向使用这些方法,因为 spark“不想”将文件保存在设备上 - 所以我只想以两种方式实现它(用于“本地”使用和“远程”)。跨度> 再次,使用file:// 前缀对 Spark 执行器的本地文件进行操作 谢谢@OneCricketeer。只要确保 - 使用 file:// 将允许我在我的磁盘上保存和加载 csv/json 文件?我希望我的应用在调试/测试时在磁盘上运行,并且在生产中的集群之上运行。

以上是关于在顶部/使用 Spark 保存和加载 JSON 和 scala 的对象的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark DataFrame 上保存到 JSON 并重新加载,模式列序列发生变化

spark DataFrame 读写和保存数据

使用 spark 展平嵌套的 json 文档并加载到 Elasticsearch

可以将 mlflow.spark 保存的模型加载为 Spark/Scala 管道吗?

使用 spark 读取和访问 json 文件中的嵌套字段

Spark SQL数据加载和保存实战