(pySpark 中分组数据的模式

Posted

技术标签:

【中文标题】(pySpark 中分组数据的模式【英文标题】:Mode of grouped data in (py)Spark 【发布时间】:2016-04-15 18:13:20 【问题描述】:

我有一个包含多列的 spark DataFrame。我想根据一列对行进行分组,然后为每个组找到第二列的模式。使用 pandas DataFrame,我会做这样的事情:

rand_values = np.random.randint(max_value,
                                size=num_values).reshape((num_values/2, 2))
rand_values = pd.DataFrame(rand_values, columns=['x', 'y'])
rand_values['x'] = rand_values['x'] > max_value/2
rand_values['x'] = rand_values['x'].astype('int32')

print(rand_values)
##    x  y
## 0  0  0
## 1  0  4
## 2  0  1
## 3  1  1
## 4  1  2

def mode(series):
    return scipy.stats.mode(series['y'])[0][0]

rand_values.groupby('x').apply(mode)
## x
## 0    4
## 1    1
## dtype: int64

在pyspark中,我可以找到单列的模式

df = sql_context.createDataFrame(rand_values)

def mode_spark(df, column):
    # Group by column and count the number of occurrences
    # of each x value
    counts = df.groupBy(column).count()

    # - Find the maximum value in the 'counts' column
    # - Join with the counts dataframe to select the row
    #   with the maximum count
    # - Select the first element of this dataframe and
    #   take the value in column
    mode = counts.join(
        counts.agg(F.max('count').alias('count')),
        on='count'
    ).limit(1).select(column)

    return mode.first()[column]

mode_spark(df, 'x')
## 1
mode_spark(df, 'y')
## 1

我不知道如何将该功能应用于分组数据。如果不能直接将这个逻辑应用到DataFrame上,是否可以通过其他方式达到同样的效果?

提前谢谢你!

【问题讨论】:

按 (x, y) 分组,与计数聚合,选择最大行如下所示:***.com/a/35226857/1560062 感谢您的快速回复!我会试试这个! 看起来成功了!我是否可以添加我的解决方案,但将您标记为“回答者”? 继续回答我不介意。您可以链接问题中的原始答案作为参考。当您获得所需的特权时,您可以投票其他答案:) 【参考方案1】:

zero323 建议的解决方案。

原解决方案:https://***.com/a/35226857/1560062

首先,计算每个 (x, y) 组合的出现次数。

counts = df.groupBy(['x', 'y']).count().alias('counts')
counts.show()
## +---+---+-----+
## |  x|  y|count|
## +---+---+-----+
## |  0|  1|    2|
## |  0|  3|    2|
## |  0|  4|    2|
## |  1|  1|    3|
## |  1|  3|    1|
## +---+---+-----+

解决方案 1:按“x”分组,通过取每组中计数的最大值进行聚合。最后,删除“计数”列。

result = (counts
          .groupBy('x')
          .agg(F.max(F.struct(F.col('count'),
                              F.col('y'))).alias('max'))
          .select(F.col('x'), F.col('max.y'))
         )
result.show()
## +---+---+
## |  x|  y|
## +---+---+
## |  0|  4|
## |  1|  1|
## +---+---+

解决方案 2:使用窗口,按“x”分区,按“计数”列排序。现在,选择每个分区中的第一行。

win = Window().partitionBy('x').orderBy(F.col('count').desc())
result = (counts
          .withColumn('row_num', F.rowNumber().over(win))
          .where(F.col('row_num') == 1)
          .select('x', 'y')
         )
result.show()
## +---+---+
## |  x|  y|
## +---+---+
## |  0|  1|
## |  1|  1|
## +---+---+

由于行的排序方式,这两个结果具有不同的结果。如果没有平局,这两种方法给出相同的结果。

【讨论】:

F.rowNumber() 在更高版本的 pyspark 中不起作用。请改用 F.rank()

以上是关于(pySpark 中分组数据的模式的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark数据框中找到没有分组的累积频率

如何在pyspark中将分组数据存储到json中

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

PySpark 分组并逐行应用 UDF 操作

pyspark - 分组和计算数据

如何在 PySpark 中进行分组并查找列的唯一项目 [重复]