在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行
Posted
技术标签:
【中文标题】在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行【英文标题】:Add a new line in front of each line before writing to JSON format using Spark in Scala 【发布时间】:2021-08-11 12:24:26 【问题描述】:我想在我的每个 json 文档前面添加一个新行,然后 Spark 将其写入我的 s3 存储桶:
df.createOrReplaceTempView("ParquetTable")
val parkSQL = spark.sql("select LAST_MODIFIED_BY, LAST_MODIFIED_DATE, NVL(CLASS_NAME, className) as CLASS_NAME, DECISION, TASK_TYPE_ID from ParquetTable")
parkSQL.show(false)
parkSQL.count()
parkSQL.write.json("s3://test-bucket/json-output-7/")
仅使用此命令,它将生成具有以下内容的文件:
"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"
"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"
但是,我想要实现的是如下所示:
"index":
"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"
"index":
"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"
任何有关如何实现此结果的见解将不胜感激!
【问题讨论】:
您想添加"index":
的任何具体原因?
这样我就可以针对 Elasticsearch 调用 bulk
加载 API,例如 ***.com/questions/45601344/…
【参考方案1】:
下面的代码会将"index":
与DataFrame
中的现有行数据连接起来,并将数据转换为json
,然后使用text
格式保存json数据。
df
.select(
lit(""""index":""").as("index"),
to_json(struct($"*")).as("json_data")
)
.select(
concat_ws(
"\n", // This will split index column & other column data into two lines.
$"index",
$"json_data"
).as("data")
)
.write
.format("text") // This is required.
.save("s3://test-bucket/json-output-7/")
最终输出
cat part-00000-24619b28-6501-4763-b3de-1a2f72a5a4ec-c000.txt
"index":
"CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"
"index":
"CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"
【讨论】:
谢谢,不过我需要它们采用.json
格式。
oh.. 我认为不可能使用json
格式,因为如果您使用json
格式,最终输出数据将是stringified
。
nvm,你是对的,使用你的命令,我能够将这个 .txt
文件批量加载到我的 Elasticsearch 中,再次感谢!以上是关于在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 spark(scala)读取和写入(更新)同一个文件
使用 scala 从 spark 中删除 bigquery 表
scala实战之spark源码修改(能够将DataFrame按字段增量写入mysql数据表)