pyspark 数据框为每一行获得第二低的值

Posted

技术标签:

【中文标题】pyspark 数据框为每一行获得第二低的值【英文标题】:pyspark dataframe get second lowest value for each row 【发布时间】:2020-03-02 20:52:59 【问题描述】:

如果有人有想法,我想查询如何在 pyspark 中的 Dataframe 行中获取第二低的值。

例如:

输入数据框

Col1  Col2  Col3  Col4 
83    32    14    62   
63    32    74    55   
13    88     6    46   

预期输出

Col1  Col2  Col3  Col4 Res
83    32    14    62   32   
63    32    74    55   55   
13    88     6    46   13

【问题讨论】:

***.com/a/43188621/797495 【参考方案1】:

我们可以使用 concat_ws 函数来连接该行的所有列,然后使用 split 创建一个数组。

使用array_sort函数对数组进行排序,提取数组的second element[1]

Example:

from pyspark.sql.functions import *

df=spark.createDataFrame([('83','32','14','62'),('63','32','74','55'),('13','88','6','46')],['Col1','Col2','Col3','Col4'])

df.selectExpr("array_sort(split(concat_ws(',',Col1,Col2,Col3,Col4),','))[1] Res").show()

#+---+
#|Res|
#+---+
#|32 |
#|55 |
#|13 |
#+---+

More Dynamic Way:

df.selectExpr("array_sort(split(concat_ws(',',*),','))[1]").show()

#+---+
#|Res|
#+---+
#|32 |
#|55 |
#|13 |
#+---+

EDIT:

#adding Res column to the dataframe
df1=df.selectExpr("*","array_sort(split(concat_ws(',',*),','))[1] Res")
df1.show()

#+----+----+----+----+---+
#|Col1|Col2|Col3|Col4|Res|
#+----+----+----+----+---+
#|  83|  32|  14|  62| 32|
#|  63|  32|  74|  55| 55|
#|  13|  88|   6|  46| 46|
#+----+----+----+----+---+

【讨论】:

太棒了!你知道如何将新列 Res 添加到数据框 @Drachens,当然!请查看我的更新答案 EDIT 部分! 您不需要连接列然后拆分以获得数组。你可以简单地使用array(*cols)【参考方案2】:

您可以使用array 函数创建一个数组列,然后使用array_sort 对其进行排序。最后,使用element_at 获取第二个元素。 Spark 2.4+ 提供了最后两个函数。

df.withColumn("res", element_at(array_sort(array(*[col(c) for c in df.columns])), 2))\
  .show()

#+----+----+----+----+---+
#|Col1|Col2|Col3|Col4|res|
#+----+----+----+----+---+
#|83  |32  |14  |62  |32 |
#|63  |32  |74  |55  |55 |
#|13  |88  |6   |46  |13 |
#+----+----+----+----+---+

另一种做法是使用least 函数。首先,计算所有列的最小值,然后使用when 表达式从大于min 的值中计算另一个时间的最小值:

df.withColumn("min", least(*[col(c) for c in df.columns]))\
  .withColumn("res", least(*[when(col(c) > col("min"), col(c)) for c in df.columns]))\
  .drop("min")\
  .show()

【讨论】:

以上是关于pyspark 数据框为每一行获得第二低的值的主要内容,如果未能解决你的问题,请参考以下文章

洛谷P3165 [CQOI2014]排序机械臂

使用较低的函数将pyspark数据框中单列中的值转换为文本清理中的小写[重复]

我可以创建一个以数据框为元素的数据框吗? (使用 sqlContext 的 Pyspark)

为每组 pyspark RDD/dataframe 选择随机列

在pyspark中旋转一行的值

如何根据火花数据框中的值的累积总和为每一行分配一个类别?