Pyspark 将 StructType 列聚合为每行的元素数组 [重复]

Posted

技术标签:

【中文标题】Pyspark 将 StructType 列聚合为每行的元素数组 [重复]【英文标题】:Pyspark aggregate a StructType column as an Array of its elements for each line [duplicate] 【发布时间】:2019-05-31 15:30:06 【问题描述】:

我正在尝试做一些看起来非常简单但不知怎么用 pyspark 做的事情。

我有一个包含两列的 df(为了简化)“id”和“strcol”,可能有重复的 ids

我想做一个 df.groupBy('id') ,它会为每个 id 返回一个 strcol 值的数组

简单的例子:

|--id--|--strCol--|
|   a  |  'a':1 |
|   a  |  'a':2 |
|   b  |  'b':3 |
|   b  |  'b':4 |
|------|----------|
would become
|--id--|-------aggsStr------|
|   a  |  ['a':1,'a':2] |
|   b  |  ['b':3,'b':4] |
|------|--------------------|

我尝试将 apply 与 pandas udf 一起使用,但它似乎拒绝返回数组。 (或者可能我没有正确使用)

【问题讨论】:

可能是的,它没有出现在我的搜索中。 【参考方案1】:

您可以使用pyspark.sql.functions 模块中的collect_list

from pyspark.sql import functions as F
agg = df.groupby("id").agg(F.collect_list("strCol"))

一个功能齐全的例子:

import pandas as pd
from pyspark.sql import functions as F

data =  'id': ['a', 'a', 'b', 'b'], 'strCol': ['a':1, 'a':2, 'b':3, 'b':4]

df_aux = pd.DataFrame(data)

# df type: DataFrame[id: string, strCol: map<string,bigint>]
df = spark.createDataFrame(df_aux) 


# agg type: # DataFrame[id: string, collect_list(strCol): array<map<string,bigint>>]
agg = df.groupby("id").agg(F.collect_list("strCol")) 

希望这有帮助!

【讨论】:

以上是关于Pyspark 将 StructType 列聚合为每行的元素数组 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 将列列表转换为聚合函数

Spark 2.1.1 上的 Pyspark,StructType 中的 StructFields 始终可以为空

PySpark:TypeError:StructType 不能接受类型为 <type 'unicode'> 或 <type 'str'> 的对象

在 groupby 操作 PySpark 中聚合列中的稀疏向量

如何将具有嵌套StructType的列转换为Spark SQL中的类实例?

如何将StructType从Spark中的json数据框分解为行而不是列