从 PySpark GroupBy 中的两列创建 JSON 字符串

Posted

技术标签:

【中文标题】从 PySpark GroupBy 中的两列创建 JSON 字符串【英文标题】:Creating JSON String from Two Columns in PySpark GroupBy 【发布时间】:2019-02-18 00:19:38 【问题描述】:

我有一个看起来像这样的数据框:

>>> l = [('a', 'foo', 1), ('b', 'bar', 1), ('a', 'biz', 6), ('c', 'bar', 3), ('c', 'biz', 2)]
>>> df = spark.createDataFrame(l, ('uid', 'code', 'level')) 
>>> df.show()
+---+----+-----+
|uid|code|level|
+---+----+-----+
|  a| foo|    1|
|  b| bar|    1|
|  a| biz|    6|
|  c| bar|    3|
|  c| biz|    2|
+---+----+-----+

我要做的是将codelevel 值分组到listdict 中,并将该列表转储为JSON 字符串,以便我可以将数据帧保存到磁盘。结果如下:

>>> df.show()
+---+--------------------------+
|uid|           json           |
+---+--------------------------+
|  a| '["foo":1, "biz":6]' |
|  b| '["bar":1]'            |
|  c| '["bar":3, "biz":2]' |
+---+--------------------------+

我对使用 PySpark 还是很陌生,我在弄清楚如何获得这个结果时遇到了很多麻烦。我几乎肯定需要一个groupBy,我已经尝试通过创建一个名为“json”的新StringType 列然后使用pandas_udf 装饰器来实现这一点,但是我遇到了关于无法使用的类型的错误,因为我我发现,我访问数据的方式是访问整列,而不仅仅是行。

>>> df = df.withColumn('json', F.list(''))
>>> schema = df.schema
>>> @pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
..: def to_json(pdf):
..:     return pdf.assign(serial=json.dumps(pdf.code:pdf.level))

我考虑过在两列之间使用字符串连接并使用collect_set,但这也感觉不对,因为它有可能将无法加载 JSON 的内容写入磁盘,因为它具有字符串表示形式。任何帮助表示赞赏。

【问题讨论】:

【参考方案1】:

在这种情况下不需要pandas_udfto_jsoncollect_listcreate_map 应该是你所需要的:

import pyspark.sql.functions as f

df.groupby('uid').agg(
  f.to_json(
    f.collect_list(
      f.create_map('code', 'level')
    )
  ).alias('json')
).show(3, False)
+---+---------------------+
|uid|json                 |
+---+---------------------+
|c  |["bar":3,"biz":2]|
|b  |["bar":1]          |
|a  |["foo":1,"biz":6]|
+---+---------------------+

【讨论】:

如果这里只使用一列,即级别,我想给我自己的 json 结构怎么办?例如,[ first : 3, status : null , second : 2, status : "pending" ] 并保持 uid 列不变。

以上是关于从 PySpark GroupBy 中的两列创建 JSON 字符串的主要内容,如果未能解决你的问题,请参考以下文章

在不包括当前行的两列之间使用pandas groupby除法创建一个新列

如何比较pyspark中两个不同数据帧中的两列

使用 PySpark 连接与另一列中的两列确定的范围相匹配的数据框

在 pyspark 中比较不同数据框中的两列,分别为 String 和 Array<string> 类型

在pyspark中添加数据类型为字符串格式的两列的值

Python:从 DataFrame 中的两列创建结构化 numpy 结构化数组