pyspark groupby 并创建包含其他列字典的列
Posted
技术标签:
【中文标题】pyspark groupby 并创建包含其他列字典的列【英文标题】:pyspark groupby and create column containing a dictionary of the others columns 【发布时间】:2021-10-22 14:36:25 【问题描述】:我有这个 pyspark 数据框
df = spark.createDataFrame([("a", "b", "v1", 1234, 56, 78, 9), ("a", "b", "v2", 987, 6, 543, 21), ("c", "d", "v1", 12, 345, 6, 789), ("c", "d", "v2", 9, 876, 5, 4321)], ("k1", "k2", "k3", "ca", "pa", "cb", "pb"))
df.show()
+---+---+---+----+---+---+----+
| k1| k2| k3| ca| pa| cb| pb|
+---+---+---+----+---+---+----+
| a| b| v1|1234| 56| 78| 9|
| a| b| v2| 987| 6|543| 21|
| c| d| v1| 12|345| 6| 789|
| c| d| v2| 9|876| 5|4321|
+---+---+---+----+---+---+----+
基本上,我想要做的是通过对前两个键 k1
和 k2
进行分组来转换此数据帧,并使用第三个键 k3
作为字典的主键,该字典将其他列的值(ca
、pa
、cb
、pb
),这将包含在一个新列中。这种转换将导致数据框看起来完全像这样:
+---+---+--------------------------------------------------------------------------------------------------+
|k1 |k2 |k3 |
+---+---+--------------------------------------------------------------------------------------------------+
|c |d |"v1": "pa": 345, "pb": 789, "ca": 12, "cb": 6, "v2": "pa": 876, "pb": 4321, "ca": 9, "cb": 5|
|a |b |"v1": "pa": 56, "pb": 9, "ca": 1234, "cb": 78, "v2": "pa": 6, "pb": 21, "ca": 987, "cb": 543|
+---+---+--------------------------------------------------------------------------------------------------+
为此,我编写了以下代码,但我认为可以改进此代码(使用 pandas_udf 或其他东西),但我没有找到更好的解决方案,我正在寻找任何可能导致的建议/指导更优雅、更高效的解决方案。
def reoganize_col(list_json):
col_data =
for d in list_json:
print(d)
for k,v in d.items():
col_data[k] = v
return json.dumps(col_data)
udf_reoganize_col = F.udf(reoganize_col, T.StringType())
df = df.withColumn('x', F.create_map(F.lit('ca'), F.col('ca'),
F.lit('cb'), F.col('cb'),
F.lit('pa'), F.col('pa'),
F.lit('pb'), F.col('pb')))
.groupby(['k1', 'k2']).agg(F.collect_list(F.create_map(F.col('k3'), F.col('x'))).alias('k3'))
df = df.withColumn('k3', udf_reoganize_col(F.col('k3')))
【问题讨论】:
【参考方案1】:您的解决方案即将完成。我建议您使用to_json
代替UDF 来提高性能,并使用struct
代替map
使代码更干净。
(df
.groupBy('k1', 'k2')
.agg(F.collect_list(F.struct('k3', F.struct('pa', 'pb', 'ca', 'cb'))).alias('k3'))
.withColumn('k3', F.to_json(F.map_from_entries('k3')))
.show(10, False)
)
# Output
# +---+---+---------------------------------------------------------------------------------+
# |k1 |k2 |k3 |
# +---+---+---------------------------------------------------------------------------------+
# |c |d |"v1":"pa":345,"pb":789,"ca":12,"cb":6,"v2":"pa":876,"pb":4321,"ca":9,"cb":5|
# |a |b |"v1":"pa":56,"pb":9,"ca":1234,"cb":78,"v2":"pa":6,"pb":21,"ca":987,"cb":543|
# +---+---+---------------------------------------------------------------------------------+
【讨论】:
非常感谢它就像一个魅力。我不知道map_from_entries
函数。我在使用 struct
函数获取期望结果时遇到问题。以上是关于pyspark groupby 并创建包含其他列字典的列的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Pyspark 中使用 groupby 和数组元素?
将 SQL 代码转换为 PySpark 的问题;我在哪里用 groupby 和 count 创建一个新的 DF