Pyspark 忽略 pyspark-sql-functions 中数据帧的过滤

Posted

技术标签:

【中文标题】Pyspark 忽略 pyspark-sql-functions 中数据帧的过滤【英文标题】:Pyspark ignoring filtering of dataframe inside pyspark-sql-functions 【发布时间】:2019-08-12 07:06:47 【问题描述】:

早上好,

我对一些 pyspark 代码有疑问。 假设我们有一个这样的数据框:

+---------+--------+-------+--------+
| p_key_1 | p_key_2| status| value_1|
+---------+--------+-------+--------+
|       13|      42|   True|      33|
|       13|      42|   True|      12|
|       13|      42|   True|     106|
|       13|      42|  False|       0|
|       13|      42|  False|      27|
+---------+--------+-------+--------+

p_key_1p_key_1 是分区键,但为了使这个例子更小,我没有改变它们。

由两个p_keys 分区我想在所有带有status = True 的行中添加一个包含value_1 最小值的列。

我想这样做:

my_win= Window.partitionBy('p_key_1', 'p_key_2')
my_df.withColumn('new_col', F.min(my_df.where(F.col('status') == True).value_1).over(my_win))

我的问题是,F.min(...) 函数内部的过滤被完全忽略,因此new_col 的所有值最终都为 0,从而产生了这个数据框:

+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
|       13|      42|   True|      33|      12|
|       13|      42|   True|      12|      12|
|       13|      42|   True|     106|      12|
|       13|      42|  False|       0|      12|
|       13|      42|  False|      27|      12|
+---------+--------+-------+--------+--------+

我想得到的数据框是这样的:

+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
|       13|      42|   True|      33|       0|
|       13|      42|   True|      12|       0|
|       13|      42|   True|     106|       0|
|       13|      42|  False|       0|       0|
|       13|      42|  False|      27|       0|
+---------+--------+-------+--------+--------+

所以我的问题是:

为什么这不起作用以及有哪些替代实现?

【问题讨论】:

还添加结果数据框的外观 添加了我得到的数据框和我真正想要的数据框示例。 我认为您需要在F.min().over() 中使用F.when(),而不是my_df.where() my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win)) 那应该做你想要的。 非常感谢,这对我有用。如果你愿意,你可以发表你的建议作为答案,我会接受的。 【参考方案1】:

实现所需的最简单方法是使用when() 而不是df.where()

从您的示例中获取变量 -

my_win = Window.partitionBy('p_key_1', 'p_key_2') # your window spec

my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win))

仅当 status 字段为 True 时,new_col 字段才给出字段 value_1 的最小值。

【讨论】:

【参考方案2】:

这是一种方法:

( 我的_df .withColumn('temp_col', F.when(F.col('status') == True, F.col('value_1'))) .withColumn( 'new_col', F.min('temp_col').over(my_win) ) .drop('temp_col') )

重点是创建一个临时列,仅当状态为TrueNull 时才在其中存储值,如果状态为False。然后你取这个 temp_col 的minNull 值将被忽略。

【讨论】:

以上是关于Pyspark 忽略 pyspark-sql-functions 中数据帧的过滤的主要内容,如果未能解决你的问题,请参考以下文章

PySpark XML 处理 - 忽略不良记录

从数据框批量插入到数据库,忽略 Pyspark 中的失败行

批量从Dataframe插入到DB,忽略Pyspark中的失败行

pyspark 给出以下错误无法在 /tmp 中创建本地目录

PySpark 读取不存在文件时的错误处理

应用转换或连接条件以在 pyspark 或 hive 中实现结果