如何在 PySpark 中进行分组并查找列的唯一项目 [重复]

Posted

技术标签:

【中文标题】如何在 PySpark 中进行分组并查找列的唯一项目 [重复]【英文标题】:How to do groupby and find unique items of a column in PySpark [duplicate] 【发布时间】:2019-06-19 07:58:01 【问题描述】:

我有一个 pySpark 数据框,我想按列分组,然后在每个组的另一列中查找唯一项。

在熊猫中我可以做到,

data.groupby(by=['A'])['B'].unique()

我想对我的 spark 数据框做同样的事情。我可以找到组中项目的 distictCount 并计数,就像这样

(spark_df.groupby('A')
    .agg(
        fn.countDistinct(col('B'))
            .alias('unique_count_B'),
        fn.count(col('B'))
            .alias('count_B')
        )
    .show())

但我找不到一些功能来查找组中的独特项目。

为了澄清更多,请考虑示例数据框,

df = spark.createDataFrame(
  [(1, "a"), (1, "b"), (1, "a"), (2, "c")],
  ["A", "B"])

我希望得到这样的输出,

+---+----------+
|  A|  unique_B|
+---+----------+
|  1|  [a, b]  |
|  2|  [c]     |
+---+----------+

如何在 pySpark 中像 pandas 一样获取输出?

【问题讨论】:

【参考方案1】:

您可以使用以下代码,该代码使用 Window 函数。

from pyspark.sql import functions as F
from pyspark.sql import Window

df = spark.createDataFrame(
  [(1, "a"), (1, "b"), (1, "a"), (2, "c")],
  ["A", "B"])
win = Window.partitionBy("A", "B")

df.withColumn("distinct AB", 
  F.count("*").over(win)).distinct().show()

结果是:

+---+---+-----------+
|  A|  B|distinct AB|
+---+---+-----------+
|  2|  c|          1|
|  1|  a|          2|
|  1|  b|          1|
+---+---+-----------+

【讨论】:

我已经用我需要的示例数据框和输出更新了我的问题。 在这种情况下,请尝试使用pyspark.sql.functions.collect_set 而不是F.count 是的,我刚刚想通了【参考方案2】:

我使用collect_set 来达到这样的目的,

(df.groupby('A')
    .agg(
        fn.collect_set(col('B')).alias('unique_count_B')
    )
    .show())

我根据需要得到以下输出,

+---+--------------+
|  A|unique_count_B|
+---+--------------+
|  1|        [b, a]|
|  2|           [c]|
+---+--------------+

【讨论】:

【参考方案3】:

您可以为此使用列表推导 分组后假设数据框位于spark_df,您可以使用:

[row.k for row in spark_df.select('k').distinct().collect()]

【讨论】:

我不是在寻找有循环的东西

以上是关于如何在 PySpark 中进行分组并查找列的唯一项目 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

(Pyspark - 在一段时间内按用户分组

在 sql 中查找唯一的数据网格

如何按 RDD 中的选定字段数进行分组,以查找基于这些字段的重复项

如何在pyspark中将分组数据存储到json中

获取由 PySpark Dataframe 上的另一列分组的列的不同元素

如何根据pyspark中的索引查找数组列的平均值