从 PySpark GroupBy 中的两列创建 JSON 字符串
Posted
技术标签:
【中文标题】从 PySpark GroupBy 中的两列创建 JSON 字符串【英文标题】:Creating JSON String from Two Columns in PySpark GroupBy 【发布时间】:2019-02-18 00:19:38 【问题描述】:我有一个看起来像这样的数据框:
>>> l = [('a', 'foo', 1), ('b', 'bar', 1), ('a', 'biz', 6), ('c', 'bar', 3), ('c', 'biz', 2)]
>>> df = spark.createDataFrame(l, ('uid', 'code', 'level'))
>>> df.show()
+---+----+-----+
|uid|code|level|
+---+----+-----+
| a| foo| 1|
| b| bar| 1|
| a| biz| 6|
| c| bar| 3|
| c| biz| 2|
+---+----+-----+
我要做的是将code
和level
值分组到list
或dict
中,并将该列表转储为JSON 字符串,以便我可以将数据帧保存到磁盘。结果如下:
>>> df.show()
+---+--------------------------+
|uid| json |
+---+--------------------------+
| a| '["foo":1, "biz":6]' |
| b| '["bar":1]' |
| c| '["bar":3, "biz":2]' |
+---+--------------------------+
我对使用 PySpark 还是很陌生,我在弄清楚如何获得这个结果时遇到了很多麻烦。我几乎肯定需要一个groupBy
,我已经尝试通过创建一个名为“json”的新StringType
列然后使用pandas_udf
装饰器来实现这一点,但是我遇到了关于无法使用的类型的错误,因为我我发现,我访问数据的方式是访问整列,而不仅仅是行。
>>> df = df.withColumn('json', F.list(''))
>>> schema = df.schema
>>> @pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
..: def to_json(pdf):
..: return pdf.assign(serial=json.dumps(pdf.code:pdf.level))
我考虑过在两列之间使用字符串连接并使用collect_set
,但这也感觉不对,因为它有可能将无法加载 JSON 的内容写入磁盘,因为它具有字符串表示形式。任何帮助表示赞赏。
【问题讨论】:
【参考方案1】:在这种情况下不需要pandas_udf
。 to_json
、collect_list
和 create_map
应该是你所需要的:
import pyspark.sql.functions as f
df.groupby('uid').agg(
f.to_json(
f.collect_list(
f.create_map('code', 'level')
)
).alias('json')
).show(3, False)
+---+---------------------+
|uid|json |
+---+---------------------+
|c |["bar":3,"biz":2]|
|b |["bar":1] |
|a |["foo":1,"biz":6]|
+---+---------------------+
【讨论】:
如果这里只使用一列,即级别,我想给我自己的 json 结构怎么办?例如,[ first : 3, status : null , second : 2, status : "pending" ]
并保持 uid 列不变。以上是关于从 PySpark GroupBy 中的两列创建 JSON 字符串的主要内容,如果未能解决你的问题,请参考以下文章
在不包括当前行的两列之间使用pandas groupby除法创建一个新列
使用 PySpark 连接与另一列中的两列确定的范围相匹配的数据框