Pyspark 忽略 pyspark-sql-functions 中数据帧的过滤
Posted
技术标签:
【中文标题】Pyspark 忽略 pyspark-sql-functions 中数据帧的过滤【英文标题】:Pyspark ignoring filtering of dataframe inside pyspark-sql-functions 【发布时间】:2019-08-12 07:06:47 【问题描述】:早上好,
我对一些 pyspark 代码有疑问。 假设我们有一个这样的数据框:
+---------+--------+-------+--------+
| p_key_1 | p_key_2| status| value_1|
+---------+--------+-------+--------+
| 13| 42| True| 33|
| 13| 42| True| 12|
| 13| 42| True| 106|
| 13| 42| False| 0|
| 13| 42| False| 27|
+---------+--------+-------+--------+
p_key_1
和 p_key_1
是分区键,但为了使这个例子更小,我没有改变它们。
由两个p_keys
分区我想在所有带有status = True
的行中添加一个包含value_1
最小值的列。
我想这样做:
my_win= Window.partitionBy('p_key_1', 'p_key_2')
my_df.withColumn('new_col', F.min(my_df.where(F.col('status') == True).value_1).over(my_win))
我的问题是,F.min(...)
函数内部的过滤被完全忽略,因此new_col
的所有值最终都为 0,从而产生了这个数据框:
+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
| 13| 42| True| 33| 12|
| 13| 42| True| 12| 12|
| 13| 42| True| 106| 12|
| 13| 42| False| 0| 12|
| 13| 42| False| 27| 12|
+---------+--------+-------+--------+--------+
我想得到的数据框是这样的:
+---------+--------+-------+--------+--------+
| p_key_1 | p_key_2| status| value_1| new_col|
+---------+--------+-------+--------+--------+
| 13| 42| True| 33| 0|
| 13| 42| True| 12| 0|
| 13| 42| True| 106| 0|
| 13| 42| False| 0| 0|
| 13| 42| False| 27| 0|
+---------+--------+-------+--------+--------+
所以我的问题是:
为什么这不起作用以及有哪些替代实现?
【问题讨论】:
还添加结果数据框的外观 添加了我得到的数据框和我真正想要的数据框示例。 我认为您需要在F.min().over()
中使用F.when()
,而不是my_df.where()
。
my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win))
那应该做你想要的。
非常感谢,这对我有用。如果你愿意,你可以发表你的建议作为答案,我会接受的。
【参考方案1】:
实现所需的最简单方法是使用when()
而不是df.where()
。
从您的示例中获取变量 -
my_win = Window.partitionBy('p_key_1', 'p_key_2') # your window spec
my_df.withColumn('new_col', F.min(F.when(F.col('status') == True), F.col(value_1)).over(my_win))
仅当 status
字段为 True
时,new_col
字段才给出字段 value_1
的最小值。
【讨论】:
【参考方案2】:这是一种方法:
( 我的_df .withColumn('temp_col', F.when(F.col('status') == True, F.col('value_1'))) .withColumn( 'new_col', F.min('temp_col').over(my_win) ) .drop('temp_col') )重点是创建一个临时列,仅当状态为True
和Null
时才在其中存储值,如果状态为False
。然后你取这个 temp_col 的min
和Null
值将被忽略。
【讨论】:
以上是关于Pyspark 忽略 pyspark-sql-functions 中数据帧的过滤的主要内容,如果未能解决你的问题,请参考以下文章
批量从Dataframe插入到DB,忽略Pyspark中的失败行