如何在pyspark中将分组数据存储到json中
Posted
技术标签:
【中文标题】如何在pyspark中将分组数据存储到json中【英文标题】:how to store grouped data into json in pyspark 【发布时间】:2016-02-27 17:58:13 【问题描述】:我是 pyspark 的新手
我有一个看起来像的数据集(只是几列的快照)
我想按键分组我的数据。我的钥匙是
CONCAT(a.div_nbr,a.cust_nbr)
我的最终目标是将数据转换成 JSON,格式如下
k1[v1,v2,....,v1,v2,....], k2[v1,v2,....,v1,v2,....],....
例如
248138339 [ PRECIMA_ID:SCP 00248 0000138339, PROD_NBR:5553505, PROD_DESC:Shot and a Beer Battered Onion Rings (5553505 and 9285840) , PROD_BRND:Molly's Kitchen,PACK_SIZE:4/2.5 LB, QTY_UOM:CA ,
PRECIMA_ID:SCP 00248 0000138339 , PROD_NBR:6659079 , PROD_DESC:Beef Chuck Short Rib Slices, PROD_BRND:Stockyards , PACK_SIZE:12 LBA , QTY_UOM:CA ,...,..., ],
1384611034793[,,],....
我创建了一个数据框(我加入两个表基本上是为了获得更多字段)
joinstmt = sqlContext.sql(
"SELECT a.precima_id , CONCAT(a.div_nbr,a.cust_nbr) as
key,a.prod_nbr , a.prod_desc,a.prod_brnd , a.pack_size , a.qty_uom , a.sales_opp , a.prc_guidance , a.pim_mrch_ctgry_desc , a.pim_mrch_ctgry_id , b.start_date,b.end_date
FROM scoop_dtl a join scoop_hdr b on (a.precima_id =b.precima_id)")
现在,为了得到上述结果,我需要根据 key 对结果进行分组,我做了以下操作
groupbydf = joinstmt.groupBy("key")
这导致 intp 是一个分组数据,阅读后我知道我不能直接使用它,我需要将它转换回数据帧来存储它。
我是新手,需要一些帮助才能将其转换回数据帧,或者如果还有其他方法,我将不胜感激。
【问题讨论】:
【参考方案1】:如果您加入的数据框如下所示:
gender age
M 5
F 50
M 10
M 10
F 10
然后您可以使用下面的代码来获得所需的输出
joinedDF.groupBy("gender") \
.agg(collect_list("age").alias("ages")) \
.write.json("jsonOutput.txt")
输出如下所示:
"gender":"F","ages":[50,10]
"gender":"M","ages":[5,10,10]
如果您有多个列,例如姓名、薪水。您可以添加如下列:
df.groupBy("gender")
.agg(collect_list("age").alias("ages"),collect_list("name").alias("names"))
您的输出将如下所示:
"gender":"F","ages":[50,10],"names":["ankit","abhay"]
"gender":"M","ages":[5,10,10],"names":["snchit","mohit","rohit"]
【讨论】:
谢谢 - 根据操作员的问题,我们如何将您的解决方案扩展到具有更多字段的数据?例如。如果joinedDF包含['gender': 'M', 'name': 'kelly', 'age': 20, 'gender': M, 'name': 'bob', 'age': 41] ,然后在按“性别”分组时,我们实现: 'gender': 'M', 'names':['kelly', 'bob'], 'ages': [20, 41] 更新了我的答案。希望有帮助。 但是收集到的列表项是有序的吗?例如在您的示例中,50 岁对应于 ankit,10 岁对应于 abhay?【参考方案2】:您不能直接使用GroupedData
。它必须先聚合。它可以通过使用 collect_list
等内置函数的聚合来部分覆盖,但使用 DataFrameWriter
使用用于表示键的值根本不可能实现所需的输出。
In 可以尝试这样的事情:
from pyspark.sql import Row
import json
def make_json(kvs):
k, vs = kvs
return json.dumps(k[0]: list(vs))
(df.select(struct(*keys), values)
.rdd
.mapValues(Row.asDict)
.groupByKey()
.map(make_json))
和saveAsTextFile
。
【讨论】:
澄清问题:OP 示例中的变量“kvs”、“*keys”和“values”分别对应什么?以上是关于如何在pyspark中将分组数据存储到json中的主要内容,如果未能解决你的问题,请参考以下文章