计算 PySpark DataFrame 列的模式?

Posted

技术标签:

【中文标题】计算 PySpark DataFrame 列的模式?【英文标题】:Calculate the mode of a PySpark DataFrame column? 【发布时间】:2016-01-05 08:23:09 【问题描述】:

对于DataFrame中的所有列,最终我想要的是列的模式。对于其他摘要统计信息,我看到了几个选项:使用 DataFrame 聚合,或将 DataFrame 的列映射到向量的 RDD(我也遇到了麻烦)并使用 MLlib 中的colStats。但我不认为模式是一种选择。

【问题讨论】:

【参考方案1】:

众数的问题与中位数的问题几乎相同。虽然它很容易计算,但计算成本相当高。可以使用排序后跟本地和全局聚合或使用 just-another-wordcount 和过滤器来完成:

import numpy as np
np.random.seed(1)

df = sc.parallelize([
    (int(x), ) for x in np.random.randint(50, size=10000)
]).toDF(["x"])

cnts = df.groupBy("x").count()
mode = cnts.join(
    cnts.agg(max("count").alias("max_")), col("count") == col("max_")
).limit(1).select("x")
mode.first()[0]
## 0

无论哪种方式,都可能需要对每一列进行完全随机播放。

【讨论】:

它给了我错误:AttributeError: 'str' object has no attribute 'alias' @Ajinkya 这意味着你使用的是builtins.max 而不是pyspark.sql.functions.max 能否请您告诉如何处理列的模式为空/缺失值的情况。然后我们应该取第二高的出现值。谢谢 @Ajinkya 用na.drop("column_name")追加这个【参考方案2】:

这一行将为您提供 spark 数据帧 df 中“col”的模式:

df.groupby("col").count().orderBy("count", ascending=False).first()[0]

有关 df 使用中所有列的模式列表:

[df.groupby(i).count().orderBy("count", ascending=False).first()[0] for i in df.columns]

要添加名称以识别哪个列的模式,请制作 2D 列表:

[[i,df.groupby(i).count().orderBy("count", ascending=False).first()[0]] for i in df.columns]

【讨论】:

【参考方案3】:

您可以使用Java代码计算列模式如下:

            case MODE:
                Dataset<Row> cnts = ds.groupBy(column).count();
                Dataset<Row> dsMode = cnts.join(
                        cnts.agg(functions.max("count").alias("max_")),
                        functions.col("count").equalTo(functions.col("max_")
                        ));
                Dataset<Row> mode = dsMode.limit(1).select(column);
                replaceValue = ((GenericRowWithSchema) mode.first()).values()[0];
                ds = replaceWithValue(ds, column, replaceValue);
                break;

private static Dataset<Row> replaceWithValue(Dataset<Row> ds, String column, Object replaceValue) 
    return ds.withColumn(column,
            functions.coalesce(functions.col(column), functions.lit(replaceValue)));

【讨论】:

您的代码示例似乎从“switch”块的中间开始,并且有特殊的缩进。您的示例开头有什么内容吗? 是的,只是模式计算示例的代码块【参考方案4】:

>>> df=newdata.groupBy('columnName').count()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]

See My result

>>> newdata.groupBy('var210').count().show()
+------+-----+
|var210|count|
+------+-----+
|  3av_|   64|
|  7A3j|  509|
|  g5HH| 1489|
|  oT7d|  109|
|  DM_V|  149|
|  uKAI|44883|
+------+-----+

# store the above result in df
>>> df=newdata.groupBy('var210').count()
>>> df.orderBy(df['count'].desc()).collect()
[Row(var210='uKAI', count=44883),
Row(var210='g5HH', count=1489),
Row(var210='7A3j', count=509),
Row(var210='DM_V', count=149),
Row(var210='oT7d', count=109),
Row(var210='3av_', count=64)]

# get the first value using collect()
>>> mode = df.orderBy(df['count'].desc()).collect()[0][0]
>>> mode
'uKAI'

使用 groupBy() 函数获取列中每个类别的计数。 df 是我的结果数据框有两列 var210,count。使用 orderBy() 并按降序排列列名“count”,在数据帧的第一行给出最大值。 collect()[0][0] 用于获取数据帧中的第 1 个元组

【讨论】:

【参考方案5】:

以下方法可以帮助您获取输入数据帧的所有列的模式

from pyspark.sql.functions import monotonically_increasing_id

def get_mode(df):
    column_lst = df.columns
    res = [df.select(i).groupby(i).count().orderBy("count", ascending=False) for i in column_lst]
    df_mode = res[0].limit(1).select(column_lst[0]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
    
    for i in range(1, len(res)):
        df2 = res[i].limit(1).select(column_lst[i]).withColumn("temp_name_monotonically_increasing_id", monotonically_increasing_id())
        df_mode = df_mode.join(df2, (df_mode.temp_name_monotonically_increasing_id == df2.temp_name_monotonically_increasing_id)).drop(df2.temp_name_monotonically_increasing_id)
        
    return df_mode.drop("temp_name_monotonically_increasing_id")

【讨论】:

以上是关于计算 PySpark DataFrame 列的模式?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:转换DataFrame中给定列的值

Pyspark 从具有不同列的行/数据创建 DataFrame

基于pyspark中仅一列的两个DataFrame之间的差异[重复]

跨 PySpark DataFrame 列的字符串匹配

Pyspark - 从 DataFrame 列的操作创建新列给出错误“列不可迭代”

如何将 PySpark Dataframe 列的类型指定为 JSON