Pyspark Dataframe 将条件添加到`reduce(add,(F.col(x) ... `
Posted
技术标签:
【中文标题】Pyspark Dataframe 将条件添加到`reduce(add,(F.col(x) ... `【英文标题】:Pyspark Dataframe add condition to `reduce(add,(F.col(x) ... ` 【发布时间】:2020-05-14 09:26:31 【问题描述】:让我们考虑如下数据框df
:
df.show()
+-----+-----+-----+-----+-----+
|col A|val_1|val_2|val_3|val_4|
+-----+-----+-----+-----+-----+
|city1| 100| 100| 200| 100|
|city2| 200| 300| 300| 100|
|city1| 100| 100| 100| 100|
|city2| 500| 200| 200| 200|
+-----+-----+-----+-----+-----+
如果我想在列 val_i 中添加值并将它们放入新列 sum,我可以执行以下操作:
from functools import reduce
from operator import add
val_cols = [x for x in df.columns if 'val' in x]
df.withColumn('sum', (reduce(add,(F.col(x) for x in val_cols)))).show()
+-----+-----+-----+-----+-----+----+
|col A|val_1|val_2|val_3|val_4| sum|
+-----+-----+-----+-----+-----+----+
|city1| 100| 100| 200| 100| 500|
|city2| 200| 300| 300| 100| 900|
|city1| 100| 100| 100| 100| 400|
|city2| 500| 200| 200| 200|1100|
+-----+-----+-----+-----+-----+----+
如何在(reduce(add,(F.col(x) ...
参数中添加条件?例如,如果我只想包含超过 200 的值。我试过这个
df.withColumn('sum', (reduce(add,(F.col(x) for x in val_cols if F.col(x)>200)))).show()
但出现以下错误:
ValueError:无法将列转换为布尔值:请使用 '&' 表示 'and'、'|'在构建 DataFrame 布尔表达式时,for 'or', '~' for 'not'。
【问题讨论】:
【参考方案1】:这可以通过使用f.when(...).otherwise(...)
提前构造条件来实现:
functools.reduce(
operator.add,
[f.when(f.col(c) > 200, f.col(c)).otherwise(f.lit(0)) for c in df1.columns]
)
【讨论】:
以上是关于Pyspark Dataframe 将条件添加到`reduce(add,(F.col(x) ... `的主要内容,如果未能解决你的问题,请参考以下文章
如何将字典中的多个值添加到 PySpark Dataframe
将具有最接近值的列添加到 PySpark Dataframe
从另一个 DataFrame 将列添加到 Pyspark DataFrame
PySpark向现有DataFrame添加列 - TypeError:无效参数,不是字符串或列