从 Pandas groupBy 到 PySpark groupBy

Posted

技术标签:

【中文标题】从 Pandas groupBy 到 PySpark groupBy【英文标题】:From Pandas groupBy to PySpark groupBy 【发布时间】:2017-03-14 02:08:23 【问题描述】:

考虑一个 Spark DataFrame,其中我们有很少的列。目标是在不将其转换为 Pandas DataFrame 的情况下对其执行 groupBy 操作。等效的 Pandas groupBy 代码如下所示:

def compute_metrics(x):
    return pd.Series(
        'a': x['a'].values[0],
        'new_b': np.sum(x['b']),
        'c': np.mean(x['c']),
        'cnt': len(x)
    )

data.groupby([
    'col_1',
    'col_2'
]).apply(compute_metrics).reset_index()

我打算用 PySpark 写这个。到目前为止,我在PySpark 中提出了类似的内容:

gdf = df.groupBy([
    'col_1',
    'col_2'
]).agg(
    'c': 'avg',
    'b': 'sum'
).withColumnRenamed('sum(b)', 'new_b')

但是,我不确定如何处理 'a': x['a'].values[0]'cnt': len(x)。我曾想过使用from pyspark.sql import functions 中的collect_list,但这会用Column object is not Callable 打我的脸。知道如何完成上述转换吗?谢谢!

[UPDATE]any 列执行count 操作以获得cnt 是否有意义?说我这样做:

gdf = df.groupBy([
    'col_1',
    'col_2'
]).agg(
    'c': 'avg',
    'b': 'sum',
    'some_column': 'count'
).withColumnRenamed('sum(b)', 'new_b')
  .withColumnRenamed('count(some_column)', 'cnt')

【问题讨论】:

【参考方案1】:

我有这个使用 PySpark 函数 sumavgcountfirst 的玩具解决方案。 注意,我在此解决方案中使用 Spark 2.1。希望对您有所帮助!

from pyspark.sql.functions import sum, avg, count, first

# create toy example dataframe with column 'A', 'B' and 'C'
ls = [['a', 'b',3], ['a', 'b', 4], ['a', 'c', 3], ['b', 'b', 5]]
df = spark.createDataFrame(ls, schema=['A', 'B', 'C'])

# group by column 'A' and 'B' then performing some function here
group_df = df.groupby(['A', 'B'])
df_grouped = group_df.agg(sum("C").alias("sumC"), 
                          avg("C").alias("avgC"), 
                          count("C").alias("countC"), 
                          first("C").alias("firstC"))
df_grouped.show() # print out the spark dataframe

【讨论】:

谢谢!你的解决方案比我丑陋的 sn-p 更优雅。我仍然想知道如何处理'a': x['a'].values[0] 在这种情况下@KevinGhaboosi values[0] 是什么?它是分组依据的第一个元素吗? 这里要注意。对于更复杂的函数,可以编写UDF(用户定义函数)以便在group by之后应用。 感谢@titipat 的评论。是的。没错! 有没有办法与 pandas apply() 等效,因为整个子数据帧可以通过另一个函数进行传递和操作?

以上是关于从 Pandas groupBy 到 PySpark groupBy的主要内容,如果未能解决你的问题,请参考以下文章

Pandas:groupby A 列并从其他列创建元组列表?

从 pandas groupby 对象中选择多个组

从 Pandas Groupby 数据框创建等高线图

Python Pandas 从 Groupby 中选择随机组样本

Pandas 0.18.1 groupby 和多级聚合错误重新采样

python pandas - 处理嵌套 groupby 的最佳方法