如何在pyspark中使用等效的熊猫轴来删除列而不是行?

Posted

技术标签:

【中文标题】如何在pyspark中使用等效的熊猫轴来删除列而不是行?【英文标题】:How to drop columns and not rows using pandas axis equivalent in pyspark? 【发布时间】:2019-10-29 07:14:36 【问题描述】:

我有一个如下所示的 spark 数据框

df = pd.DataFrame(
'subject_id':[1,1,1,1,2,2,2,2,3,3,4,4,4,4,4],
'readings' : ['READ_1','READ_2','READ_1','READ_3',np.nan,'READ_5',np.nan,'READ_8','READ_10','READ_12','READ_11','READ_14','READ_09','READ_08','READ_07'],
 'val' :[5,np.nan,7,np.nan,np.nan,7,np.nan,np.nan,np.nan,np.nan,np.nan,np.nan,np.nan,np.nan,46],
 )

from pyspark.sql.types import *
from pyspark.sql.functions import isnan, when, count, col

mySchema = StructType([ StructField("subject_id", LongType(), True)\
                       ,StructField("readings", StringType(), True)\
                       ,StructField("val", FloatType(), True)])

spark_df = spark.createDataFrame(df,schema=mySchema)

我想做的是删除具有超过 80% pc 的 NaN, NULL or 0 值的列?

我尝试了类似下面的方法,但它不起作用

spark_df = spark_df.dropna(axis = 'columns',how=any,thresh=12)

以上在pandas 中是可能的,但在这里不起作用。我收到以下错误,这并不奇怪

TypeError: dropna() 得到了一个意外的关键字参数 'axis'

请注意,我的真实数据框是40 million and 3k columns。我提到了这个post,但它还没有答案

pyspark 中是否有与此等价的东西?

我希望我的输出如下所示,只有 2 列

【问题讨论】:

【参考方案1】:

您可以使用dropna 方法中的subset 参数来指定要在其中查找空值的列。

要删除空值超过 80% 的所有列:

columns_to_drop = []
count_before = spark_df.count()

for column_name in spark_df.columns:
    temp_spark_df =  spark_df.dropna(subset=[column_name], how=any, thresh=12)
    count_after = temp_spark_df.count()

    if ((count_before-count_after)/count_before) > 0.8:
        columns_to_drop.append(column_name)


spark_df = spark_df.drop(*columns_to_drop)

【讨论】:

感谢会尝试。但不幸的是,我的数据框非常庞大,多达 4000 万和 3k 列。所以想着会不会快。无论如何都赞成。将尝试尽快更新答案。谢谢【参考方案2】:

你可以直接使用这个功能,它是最快的:

def drop_null_columns_spark(df, threshold=0):
    """
    This function drops all columns which contain null values with a threshold.
    :param df: A PySpark DataFrame
    :param threshold: Minimum number of nulls to consider dropping of column
    """
    null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in null_counts.items() if v > threshold]
    df = df.drop(*to_drop)
    return df

【讨论】:

感谢会尝试。赞成。但我已经尝试过类似的方法。对于 1300 列,这不会花费更长的时间吗? @SSMK Count 是一项昂贵的操作,但它是必需的,因为您需要一个空计数权限。集合很便宜,因为您已汇总到一行。

以上是关于如何在pyspark中使用等效的熊猫轴来删除列而不是行?的主要内容,如果未能解决你的问题,请参考以下文章

如何删除一列并创建一个新列而不是在 EF Core 中重命名?

可以隐藏 SlickGrid 列而不将其从“列”数组中删除吗?

MySql:将数据添加到列而不删除以前的数据

重置现有值的标识列而不删除现有记录

如何在现有表中添加额外的列而不丢失数据

如何使用 python 代码显示不相关的列而不绘图