PySpark Dataframe Aggregation 中包含 null 的计数

Posted

技术标签:

【中文标题】PySpark Dataframe Aggregation 中包含 null 的计数【英文标题】:Count including null in PySpark Dataframe Aggregation 【发布时间】:2017-09-19 17:03:22 【问题描述】:

我正在尝试使用 agg 和 count 对 DataFrame 进行一些计数。

from pyspark.sql import Row ,functions as F
row = Row("Cat","Date")
df = (sc.parallelize
  ([
        row("A",'2017-03-03'),
        row('A',None),
        row('B','2017-03-04'),
        row('B','Garbage'),
        row('A','2016-03-04')
]).toDF())
df = df.withColumn("Casted", df['Date'].cast('date'))
df.show()

(
df.groupby(df['Cat'])
.agg
(
    #F.count(col('Date').isNull() | col('Date').isNotNull()).alias('Date_Count'),
    F.count('Date').alias('Date_Count'),
    F.count('Casted').alias('Valid_Date_Count')
)    
.show()

)

函数 F.count() 只给我非空计数。除了使用“或”条件之外,有没有办法获得包括空值在内的计数。

无效计数似乎不起作用。 & 条件看起来不像预期的那样工作。

(
 df
 .groupby(df['Cat'])
.agg
 (
  F.count('*').alias('count'),    
  F.count('Date').alias('Date_Count'),
  F.count('Casted').alias('Valid_Date_Count'),
  F.count(col('Date').isNotNull() & col('Casted').isNull()).alias('invalid')
 )    
.show()
)

【问题讨论】:

【参考方案1】:

将布尔表达式转换为 intsum

df\
    .groupby(df['Cat'])\
    .agg ( 
        F.count('Date').alias('Date_Count'), 
        F.count('Casted').alias('Valid_Date_Count'), 
        F.sum((~F.isnull('Date')&F.isnull("Casted")).cast("int")).alias("Invalid_Date_Cound")
    ).show()

    +---+----------+----------------+------------------+
    |Cat|Date_Count|Valid_Date_Count|Invalid_Date_Cound|
    +---+----------+----------------+------------------+
    |  B|         2|               1|                 1|
    |  A|         2|               2|                 0|
    +---+----------+----------------+------------------+

【讨论】:

你能看看我最后添加的代码块吗?无效的别名没有给出预期的结果。 @Tronald Dump 你的预期输出是什么 在文章末尾添加预期输出。 给你,你必须对表达式求和

以上是关于PySpark Dataframe Aggregation 中包含 null 的计数的主要内容,如果未能解决你的问题,请参考以下文章

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

PySpark:转换DataFrame中给定列的值

PySpark|比RDD更快的DataFrame

Pyspark:将 pyspark.sql.row 转换为 Dataframe

是否可以在 Pyspark 中对 DataFrame 进行子类化?

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe