在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行

Posted

技术标签:

【中文标题】在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行【英文标题】:Add a new line in front of each line before writing to JSON format using Spark in Scala 【发布时间】:2021-08-11 12:24:26 【问题描述】:

我想在我的每个 json 文档前面添加一个新行,然后 Spark 将其写入我的 s3 存储桶:

df.createOrReplaceTempView("ParquetTable")
val parkSQL = spark.sql("select LAST_MODIFIED_BY, LAST_MODIFIED_DATE, NVL(CLASS_NAME, className) as CLASS_NAME, DECISION, TASK_TYPE_ID from ParquetTable")
parkSQL.show(false)
parkSQL.count()

parkSQL.write.json("s3://test-bucket/json-output-7/")

仅使用此命令,它将生成具有以下内容的文件:

"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"
"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"

但是,我想要实现的是如下所示:

"index":
"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"
"index":
"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"

任何有关如何实现此结果的见解将不胜感激!

【问题讨论】:

您想添加"index":的任何具体原因? 这样我就可以针对 Elasticsearch 调用 bulk 加载 API,例如 ***.com/questions/45601344/… 【参考方案1】:

下面的代码会将"index":DataFrame中的现有行数据连接起来,并将数据转换为json,然后使用text格式保存json数据。

df
.select(
    lit(""""index":""").as("index"),
    to_json(struct($"*")).as("json_data")
)
.select(
    concat_ws(
        "\n", // This will split index column & other column data into two lines.
        $"index",
        $"json_data"
    ).as("data")
)
.write
.format("text") // This is required.
.save("s3://test-bucket/json-output-7/")

最终输出

cat part-00000-24619b28-6501-4763-b3de-1a2f72a5a4ec-c000.txt

"index":
"CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"
"index":
"CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"

【讨论】:

谢谢,不过我需要它们采用.json 格式。 oh.. 我认为不可能使用json 格式,因为如果您使用json 格式,最终输出数据将是stringified nvm,你是对的,使用你的命令,我能够将这个 .txt 文件批量加载到我的 Elasticsearch 中,再次感谢!

以上是关于在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 spark(scala)读取和写入(更新)同一个文件

使用 scala 从 spark 中删除 bigquery 表

scala实战之spark源码修改(能够将DataFrame按字段增量写入mysql数据表)

仅从 Spark Scala DataFrame 写入标头 CSV 记录

将 scala/spark 信息写入 MongoDB

在顶部/使用 Spark 保存和加载 JSON 和 scala 的对象