PySpark DataFrames - 使用不同类型的列之间的比较进行过滤

Posted

技术标签:

【中文标题】PySpark DataFrames - 使用不同类型的列之间的比较进行过滤【英文标题】:PySpark DataFrames - filtering using comparisons between columns of different types 【发布时间】:2019-01-31 10:07:35 【问题描述】:

假设您有一个数据框,其中包含各种类型的列(字符串、双精度...)和一个特殊值“miss”,它表示字符串类型列中的“缺失值”。

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.getOrCreate()

pdf = pd.DataFrame([
    [1, 'miss'],
    [2, 'x'],
    [None, 'y']
], columns=['intcol', 'strcol'])

df = spark.createDataFrame(data=pdf)

我正在尝试使用如下过滤来计算每列的非缺失值的数量:

col = df['strcol']
df.filter(col.isNotNull() & (col != 'miss')).show()

适用于字符串列:

+------+------+
|intcol|strcol|
+------+------+
|   2.0|     x|
|   NaN|     y|
+------+------+

但是,对于数字列,它会过滤掉所有行:

col = df['intcol']
df.filter(col.isNotNull() & (col != 'miss')).show()
+------+------+
|intcol|strcol|
+------+------+
+------+------+

这似乎是因为数字列与字符串值的跨类型比较导致全空值:

df.select(df['intcol'] != 'miss').show()
+---------------------+
|(NOT (intcol = miss))|
+---------------------+
|                 null|
|                 null|
|                 null|
+---------------------+

我觉得有点意外(例如,1 != '' 是 True,在“普通”Python 中不是 null)

我的问题其实是几个问题:

为什么交叉类型比较会产生空值? 以“预期方式”测试不同类型的相等/不相等的最佳方法是什么?或者(就我而言)我是否需要包含根据列类型进行切换的单独逻辑? 似乎df.filter(~df['intcol'].isin(['miss'])) 可以完成这项工作,但我想知道这是否效率较低?

【问题讨论】:

【参考方案1】:

让我们从原因开始。 DataFrame API 是适用于 SQL 和 SQL 评估规则的 DSL。每当您对不同类型的对象应用运算符时,CAST 操作就会根据预定义的规则应用于较低优先级的操作数。在一般数值类型中,具有更高的优先级,因此(遵循执行计划df.select(df['intcol'] != 'miss').explain(True)):

== Parsed Logical Plan ==
'Project [NOT (intcol#0 = miss) AS (NOT (intcol = miss))#12]
+- LogicalRDD [intcol#0, strcol#1], false

改写为

== Analyzed Logical Plan ==
(NOT (intcol = miss)): boolean
Project [NOT (intcol#0 = cast(miss as double)) AS (NOT (intcol = miss))#12]
+- LogicalRDD [intcol#0, strcol#1], false

其中'miss'CASTEDdouble,后来又转换为NULL

== Optimized Logical Plan ==
Project [null AS (NOT (intcol = miss))#22]
+- LogicalRDD [intcol#0, strcol#1], false

因为这个操作数的强制转换是未定义的。

由于与 NULL 的相等性也未定义 - Difference between === null and isNull in Spark DataDrame - filter 产生空结果。

现在如何解决这个问题。两者都显式转换:

df.filter(df['intcol'].cast("string") != 'miss')

和 null 安全相等:

df.filter(~df['intcol'].cast("string").eqNullSafe('miss'))

应该可以解决问题。

另外请注意 NaN 的值不是 NULL 并且通过 Pandas 进行的转换是有损的 - Pandas dataframe to Spark dataframe, handling NaN conversions to actual null?

【讨论】:

以上是关于PySpark DataFrames - 使用不同类型的列之间的比较进行过滤的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:具有不同列的 DataFrames 的动态联合

如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引

使用 Pyspark / Dataframes 时,如何将谓词下推到 Cassandra 或限制请求的数据?

使用 Python 的 reduce() 加入多个 PySpark DataFrames

Pyspark DataFrames 中的嵌套 SELECT 查询

使用 pyspark 在循环中附加 Spark DataFrames 的有效方法