pyspark 数据框为每一行获得第二低的值
Posted
技术标签:
【中文标题】pyspark 数据框为每一行获得第二低的值【英文标题】:pyspark dataframe get second lowest value for each row 【发布时间】:2020-03-02 20:52:59 【问题描述】:如果有人有想法,我想查询如何在 pyspark 中的 Dataframe 行中获取第二低的值。
例如:
输入数据框:
Col1 Col2 Col3 Col4
83 32 14 62
63 32 74 55
13 88 6 46
预期输出:
Col1 Col2 Col3 Col4 Res
83 32 14 62 32
63 32 74 55 55
13 88 6 46 13
【问题讨论】:
***.com/a/43188621/797495 【参考方案1】:我们可以使用 concat_ws
函数来连接该行的所有列,然后使用 split
创建一个数组。
使用array_sort
函数对数组进行排序,提取数组的second element[1]
。
Example:
from pyspark.sql.functions import *
df=spark.createDataFrame([('83','32','14','62'),('63','32','74','55'),('13','88','6','46')],['Col1','Col2','Col3','Col4'])
df.selectExpr("array_sort(split(concat_ws(',',Col1,Col2,Col3,Col4),','))[1] Res").show()
#+---+
#|Res|
#+---+
#|32 |
#|55 |
#|13 |
#+---+
More Dynamic Way:
df.selectExpr("array_sort(split(concat_ws(',',*),','))[1]").show()
#+---+
#|Res|
#+---+
#|32 |
#|55 |
#|13 |
#+---+
EDIT:
#adding Res column to the dataframe
df1=df.selectExpr("*","array_sort(split(concat_ws(',',*),','))[1] Res")
df1.show()
#+----+----+----+----+---+
#|Col1|Col2|Col3|Col4|Res|
#+----+----+----+----+---+
#| 83| 32| 14| 62| 32|
#| 63| 32| 74| 55| 55|
#| 13| 88| 6| 46| 46|
#+----+----+----+----+---+
【讨论】:
太棒了!你知道如何将新列 Res 添加到数据框 @Drachens,当然!请查看我的更新答案EDIT
部分!
您不需要连接列然后拆分以获得数组。你可以简单地使用array(*cols)
。【参考方案2】:
您可以使用array
函数创建一个数组列,然后使用array_sort
对其进行排序。最后,使用element_at
获取第二个元素。 Spark 2.4+ 提供了最后两个函数。
df.withColumn("res", element_at(array_sort(array(*[col(c) for c in df.columns])), 2))\
.show()
#+----+----+----+----+---+
#|Col1|Col2|Col3|Col4|res|
#+----+----+----+----+---+
#|83 |32 |14 |62 |32 |
#|63 |32 |74 |55 |55 |
#|13 |88 |6 |46 |13 |
#+----+----+----+----+---+
另一种做法是使用least
函数。首先,计算所有列的最小值,然后使用when
表达式从大于min
的值中计算另一个时间的最小值:
df.withColumn("min", least(*[col(c) for c in df.columns]))\
.withColumn("res", least(*[when(col(c) > col("min"), col(c)) for c in df.columns]))\
.drop("min")\
.show()
【讨论】:
以上是关于pyspark 数据框为每一行获得第二低的值的主要内容,如果未能解决你的问题,请参考以下文章
使用较低的函数将pyspark数据框中单列中的值转换为文本清理中的小写[重复]
我可以创建一个以数据框为元素的数据框吗? (使用 sqlContext 的 Pyspark)