spark:如何将行合并到 jsons 数组
Posted
技术标签:
【中文标题】spark:如何将行合并到 jsons 数组【英文标题】:spark: how to merge rows to array of jsons 【发布时间】:2019-03-01 01:53:47 【问题描述】:输入:
id1 id2 name value epid
"xxx" "yyy" "EAN" "5057723043" "1299"
"xxx" "yyy" "MPN" "EVBD" "1299"
我想要:
"id1": "xxx",
"id2": "yyy",
"item_specifics": [
"name": "EAN",
"value": "5057723043"
,
"name": "MPN",
"value": "EVBD"
,
"name": "EPID",
"value": "1299"
]
我从How to aggregate columns into json array?和how to merge rows into column of spark dataframe as vaild json to write it in mysql尝试了以下两种解决方案:
pi_df.groupBy(col("id1"), col("id2"))
//.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working
.agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))
但我得到了:
"name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy"
如何解决这个问题?谢谢
【问题讨论】:
能否添加示例输入? 输出是无效的 json,除非你把 xxx 和 yyy 用双引号括起来。 你用的spark版本是什么? 火花 2.3.1 ..... 【参考方案1】:对于 Spark
您可以创建 2 个数据帧,一个具有名称和值,另一个具有史诗作为名称和史诗值作为值,并将它们联合在一起。然后将它们聚合为 collect_set 并创建一个 json。代码应如下所示。
//Creating Test Data
val df = Seq(("xxx","yyy" ,"EAN" ,"5057723043","1299"), ("xxx","yyy" ,"MPN" ,"EVBD", "1299") )
.toDF("id1", "id2", "name", "value", "epid")
df.show(false)
+---+---+----+----------+----+
|id1|id2|name|value |epid|
+---+---+----+----------+----+
|xxx|yyy|EAN |5057723043|1299|
|xxx|yyy|MPN |EVBD |1299|
+---+---+----+----------+----+
val df1 = df.withColumn("map", struct(col("name"), col("value")))
.select("id1", "id2", "map")
val df2 = df.withColumn("map", struct(lit("EPID").as("name"), col("epid").as("value")))
.select("id1", "id2", "map")
val jsonDF = df1.union(df2).groupBy("id1", "id2")
.agg(collect_set("map").as("item_specifics"))
.withColumn("json", to_json(struct("id1", "id2", "item_specifics")))
jsonDF.select("json").show(false)
+---------------------------------------------------------------------------------------------------------------------------------------------+
|json |
+---------------------------------------------------------------------------------------------------------------------------------------------+
|"id1":"xxx","id2":"yyy","item_specifics":["name":"MPN","value":"EVBD","name":"EAN","value":"5057723043","name":"EPID","value":"1299"]|
+---------------------------------------------------------------------------------------------------------------------------------------------+
对于 Spark = 2.4
它提供了一个array_union 方法。在没有联合的情况下这样做可能会有所帮助。不过我没试过。
val jsonDF = df.withColumn("map1", struct(col("name"), col("value")))
.withColumn("map2", struct(lit("epid").as("name"), col("epid").as("value")))
.groupBy("id1", "id2")
.agg(collect_set("map1").as("item_specifics1"),
collect_set("map2").as("item_specifics2"))
.withColumn("item_specifics", array_union(col("item_specifics1"), col("item_specifics2")))
.withColumn("json", to_json(struct("id1", "id2", "item_specifics2")))
【讨论】:
谢谢。但collect_set
无法删除重复项。
collect_set 删除重复项并保留唯一值。请检查结果。【参考方案2】:
你已经很接近了。我相信您正在寻找这样的东西:
val pi_df2 = pi_df.withColumn("name", lit("EPID")).
withColumnRenamed("epid", "value").
select("id1", "id2", "name","value")
pi_df.select("id1", "id2", "name","value").
union(pi_df2).withColumn("item_specific", struct(col("name"), col("value"))).
groupBy(col("id1"), col("id2")).
agg(collect_list(col("item_specific")).alias("item_specifics")).
write.json(...)
工会应该将 epid 带回 item_specifics
【讨论】:
谢谢。我尝试了第一个解决方案,发现“item_specific”列中有很多重复的name
和value
结构。我不知道为什么,但我正在调查它。顺便说一句,可以通过udf
添加epid
?
如果有很多重复,可能是因为数据本身有重复。如果您不需要,也可以使用 collect_set,或者在 groupBy 之前执行 df.distinct()。我不知道有什么直接的方法可以将 epid 添加到 udf
为什么我没有看到epid
?
为了清楚起见,我编辑了答案。 epid 应该重新联合出现在 item_specifics 中【参考方案3】:
这是你需要做的事情
import scala.util.parsing.json.JSONObject
import scala.collection.mutable.WrappedArray
//Define udf
val jsonFun = udf((id1 : String, id2 : String, item_specifics: WrappedArray[Map[String, String]], epid: String)=>
//Add epid to item_specifics json
val item_withEPID = item_specifics :+ Map("epid" -> epid)
val item_specificsArray = item_withEPID.map(m => ( Array(Map("name" -> m.keys.toSeq(0), "value" -> m.values.toSeq(0))))).map(m => m.map( mi => JSONObject(mi).toString().replace("\\",""))).flatten.mkString("[",",","]")
//Add id1 and id2 to output json
val m = Map("id1"-> id1, "id2"-> id2, "item_specifics" -> item_specificsArray.toSeq )
JSONObject(m).toString().replace("\\","")
)
val pi_df = Seq( ("xxx","yyy","EAN","5057723043","1299"), ("xxx","yyy","MPN","EVBD","1299")).toDF("id1","id2","name","value","epid")
//Add epid as part of group by column else the column will not be available after group by and aggregation
val df = pi_df.groupBy(col("id1"), col("id2"), col("epid")).agg(collect_list(map(col("name"), col("value")) as "map").as("item_specifics")).withColumn("item_specifics",jsonFun($"id1",$"id2",$"item_specifics",$"epid"))
df.show(false)
scala> df.show(false)
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id1|id2|epid|item_specifics |
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|xxx|yyy|1299|"id1" : "xxx", "id2" : "yyy", "item_specifics" : ["name" : "MPN", "value" : "EVBD","name" : "EAN", "value" : "5057723043","name" : "epid", "value" : "1299"]|
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
item_specifics 列/输出的内容
"id1": "xxx",
"id2": "yyy",
"item_specifics": [
"name": "MPN",
"value": "EVBD"
,
"name": "EAN",
"value": "5057723043"
,
"name": "epid",
"value": "1299"
]
【讨论】:
以上是关于spark:如何将行合并到 jsons 数组的主要内容,如果未能解决你的问题,请参考以下文章
PHP json_encode将行作为对象而不是数组返回[重复]