从 pyspark 数据框中删除空列

Posted

技术标签:

【中文标题】从 pyspark 数据框中删除空列【英文标题】:Remove null columns from a pyspark dataframe 【发布时间】:2020-04-16 05:11:34 【问题描述】:

我有一个非常脏的 csv,其中有几列只有空值。

我想删除它们。我正在尝试选择列中空值计数不等于行数的所有列。

clean_df = bucketed_df.select([c for c in bucketed_df.columns if count(when(isnull(c), c)) not bucketed_df.count()])

但是,我收到此错误:

SyntaxError: invalid syntax
  File "<command-2213215314329625>", line 1
    clean_df = bucketed_df.select([c for c in bucketed_df.columns if count(when(isnull(c), c)) not bucketed_df.count()])
                                                                                                             ^
SyntaxError: invalid syntax

如果有人能帮我摆脱这些脏栏,那就太好了。

【问题讨论】:

这能回答你的问题吗? Drop if all entries in a spark dataframe's specific column is null 您可以简单地进行正常计数并检查哪些列返回 0。 其实可以,但是真的很慢。运行一个大约需要 10 分钟。不可行,因为我需要在许多数据集上多次执行此操作。 我认为您找不到比单次计数更快的解决方案,但可能会有更好的解决方案。 【参考方案1】:

我对 pyspark 的经验很少,但使用计数创建一个数据框并将其转换为 pandas 可能不是一个坏主意,因为计数数据框将只有一行:

从如下所示的数据框开始并保存为null_df

+---+---+---+----+
|  A|  B|  C|   D|
+---+---+---+----+
|  1|  a|  b|null|
|  2|  c|  d|null|
|  3|  e|  f|null|
+---+---+---+----+

import pyspark.sql.functions as F

counts = null_df.select([F.count(i).alias(i) for i in null_df.columns]).toPandas()
output = null_df.select(*counts.columns[counts.ne(0).iloc[0]])

或者甚至将整个第一行转换为字典,然后循环遍历字典

counts1 = null_df.select([F.count(i).alias(i) for i in null_df.columns])
output2 = null_df.select([k for k,v in counts1.first().asDict().items() if v >0])

它给出了以下内容:

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  a|  b|
|  2|  c|  d|
|  3|  e|  f|
+---+---+---+

在我的系统中测试的基准:

%%timeit
counts = null_df.select([F.count(i).alias(i) for i in null_df.columns]).toPandas()
output = null_df.select(*counts.columns[counts.ne(0).iloc[0]])
#8.73 s ± 412 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
counts1 = null_df.select([F.count(i).alias(i) for i in null_df.columns])
output2 = null_df.select([k for k,v in counts1.first().asDict().items() if v >0])
#9.43 s ± 253 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
counts1 = null_df.select([F.count(i).alias(i) for i in null_df.columns])
output1 = null_df.select([c for c in counts1.columns if counts1[[c]].first()[c] > 0])
#35.3 s ± 1 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

【讨论】:

【参考方案2】:

有两种方法可以解决这个问题,

1) 编写一个 UDF 函数来添加一个列,如果所需列(您正在检查 NULL 的列)值为 NULL,则该列的值为 1,如果总和相等,则取该列的总和到行数,然后删除列

2) 使用 amazon spark dq 库(Spark 的开源数据质量库),该库具有分析数据的功能,dq 返回的列之一是每列的完整性因子,如果完整性因子为 1 则整个列值为 NULL ,您可以删除这些列 我个人觉得这个选项很棒,因为它旨在使用 spark 执行数据质量检查

下面的链接中有很多例子

https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/ https://github.com/awslabs/deequ

【讨论】:

【参考方案3】:

可以使用函数“min”,并且可以删除具有空值的列。在 Scala 上,可以很容易地翻译成 Python:

// data sample
val df = Seq(("Bug", null.asInstanceOf[Integer], null.asInstanceOf[String]),
  ("Termit", null.asInstanceOf[Integer], null.asInstanceOf[String]))
  .toDF("name", "size", "type")
val fieldNames = df.schema.fieldNames

// get null values 
val fieldExpressions = fieldNames.map(c => min(c).alias(c))
val firstRow = df.select(fieldExpressions: _*).collect().head

val fieldsToDrop = fieldNames.filter(f => firstRow.isNullAt(fieldNames.indexOf(f)))

【讨论】:

以上是关于从 pyspark 数据框中删除空列的主要内容,如果未能解决你的问题,请参考以下文章

删除空列的快速方法 [PySpark]

在 PySpark DataFrame 中添加多个空列

如何使用模式匹配从 pyspark 数据框中删除行?

从 pyspark 数据框中删除第一行

从 PySpark 中的数据框中删除重复项

从 PySpark 中的数据框中删除重复项