Pyspark Dataframe 将条件添加到`reduce(add,(F.col(x) ... `

Posted

技术标签:

【中文标题】Pyspark Dataframe 将条件添加到`reduce(add,(F.col(x) ... `【英文标题】:Pyspark Dataframe add condition to `reduce(add,(F.col(x) ... ` 【发布时间】:2020-05-14 09:26:31 【问题描述】:

让我们考虑如下数据框df

df.show()
+-----+-----+-----+-----+-----+
|col A|val_1|val_2|val_3|val_4|
+-----+-----+-----+-----+-----+
|city1|  100|  100|  200|  100|
|city2|  200|  300|  300|  100|
|city1|  100|  100|  100|  100|
|city2|  500|  200|  200|  200|
+-----+-----+-----+-----+-----+

如果我想在列 val_i 中添加值并将它们放入新列 sum,我可以执行以下操作:

from functools import reduce
from operator import add
val_cols = [x for x in df.columns if 'val' in x]
df.withColumn('sum', (reduce(add,(F.col(x) for x in val_cols)))).show()
+-----+-----+-----+-----+-----+----+
|col A|val_1|val_2|val_3|val_4| sum|
+-----+-----+-----+-----+-----+----+
|city1|  100|  100|  200|  100| 500|
|city2|  200|  300|  300|  100| 900|
|city1|  100|  100|  100|  100| 400|
|city2|  500|  200|  200|  200|1100|
+-----+-----+-----+-----+-----+----+

如何在(reduce(add,(F.col(x) ... 参数中添加条件?例如,如果我只想包含超过 200 的值。我试过这个

df.withColumn('sum', (reduce(add,(F.col(x) for x in val_cols if F.col(x)>200)))).show()

但出现以下错误:

ValueError:无法将列转换为布尔值:请使用 '&' 表示 'and'、'|'在构建 DataFrame 布尔表达式时,for 'or', '~' for 'not'。

【问题讨论】:

【参考方案1】:

这可以通过使用f.when(...).otherwise(...)提前构造条件来实现:

functools.reduce(
  operator.add, 
  [f.when(f.col(c) > 200, f.col(c)).otherwise(f.lit(0)) for c in df1.columns]
)

【讨论】:

以上是关于Pyspark Dataframe 将条件添加到`reduce(add,(F.col(x) ... `的主要内容,如果未能解决你的问题,请参考以下文章

如何将字典中的多个值添加到 PySpark Dataframe

将具有最接近值的列添加到 PySpark Dataframe

从另一个 DataFrame 将列添加到 Pyspark DataFrame

PySpark向现有DataFrame添加列 - TypeError:无效参数,不是字符串或列

PySpark:当列是列表时,将列添加到 DataFrame

想将key添加到pyspark dataFrame的爆炸数组中