为 PySpark 中的最大值选择每行的列名
Posted
技术标签:
【中文标题】为 PySpark 中的最大值选择每行的列名【英文标题】:Select column name per row for max value in PySpark 【发布时间】:2019-10-16 18:50:57 【问题描述】:我有一个这样的数据框,只显示了两列,但是原始数据框中有很多列
data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.show()
+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1| 3| 5|
|ID2| 4| 12|
|ID3| 8| 3|
+---+----+----+
我想提取每行列的名称,它具有最大值。因此预期的输出是这样的
+---+----+----+-------+
| ID|colA|colB|Max_col|
+---+----+----+-------+
|ID1| 3| 5| colB|
|ID2| 4| 12| colB|
|ID3| 8| 3| colA|
+---+----+----+-------+
如果是 tie,colA 和 colB 的值相同,选择第一列。
如何在 pyspark 中实现这一点
【问题讨论】:
Scala/Spark dataframes: find the column name corresponding to the max的可能重复 how to get the name of column with maximum value in pyspark dataframe的可能重复 【参考方案1】:尝试以下方法:
from pyspark.sql import functions as F
data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.withColumn('max_col',
F.when(F.col('colA') > F.col('colB'), 'colA').
otherwise('colB')).show()
产量:
+---+----+----+-------+
| ID|colA|colB|max_col|
+---+----+----+-------+
|ID1| 3| 5| colB|
|ID2| 4| 12| colB|
|ID3| 8| 3| colA|
+---+----+----+-------+
【讨论】:
嗨 Elior,如果我只有两列,该解决方案将有效,但是我有很多列 嗨哈德里德,很抱歉我错过了。 嗨哈德里德,对不起,我错过了。我提出的解决方案的架构是这样的:(对不起,我无法编写代码,您需要为此拥有 2.4 版本,而我没有): 1. 添加这样的列:df.withColumn('arr', F.array('col1', 'col2', ... , 'coln'))
2. 添加一列maxval
: withColumn('max_val', F.array_max('arr'))
3. 最后,使用map
函数(来自RDD),选择值等于列max_val
中的值的列。【参考方案2】:
有多种选择可以实现这一目标。我是一个提供示例,可以为休息提供提示-
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
from pyspark.sql import types as T
data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.show()
+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1| 3| 5|
|ID2| 4| 12|
|ID3| 8| 3|
+---+----+----+
#Below F.array creates an array of column name and value pair like [['colA', 3], ['colB', 5]] then F.explode break this array into rows like different column and value pair should be in different rows
df = df.withColumn(
"max_val",
F.explode(
F.array([
F.array([F.lit(cl), F.col(cl)]) for cl in df.columns[1:]
])
)
)
df.show()
+---+----+----+----------+
| ID|colA|colB| max_val|
+---+----+----+----------+
|ID1| 3| 5| [colA, 3]|
|ID1| 3| 5| [colB, 5]|
|ID2| 4| 12| [colA, 4]|
|ID2| 4| 12|[colB, 12]|
|ID3| 8| 3| [colA, 8]|
|ID3| 8| 3| [colB, 3]|
+---+----+----+----------+
#Then select columns so that column name and value should be in different columns
df = df.select(
"ID",
"colA",
"colB",
F.col("max_val").getItem(0).alias("col_name"),
F.col("max_val").getItem(1).cast(T.IntegerType()).alias("col_value"),
)
df.show()
+---+----+----+--------+---------+
| ID|colA|colB|col_name|col_value|
+---+----+----+--------+---------+
|ID1| 3| 5| colA| 3|
|ID1| 3| 5| colB| 5|
|ID2| 4| 12| colA| 4|
|ID2| 4| 12| colB| 12|
|ID3| 8| 3| colA| 8|
|ID3| 8| 3| colB| 3|
+---+----+----+--------+---------+
# Rank column values based on ID in desc order
df = df.withColumn(
"rank",
F.rank().over(W.partitionBy("ID").orderBy(F.col("col_value").desc()))
)
df.show()
+---+----+----+--------+---------+----+
| ID|colA|colB|col_name|col_value|rank|
+---+----+----+--------+---------+----+
|ID2| 4| 12| colB| 12| 1|
|ID2| 4| 12| colA| 4| 2|
|ID3| 8| 3| colA| 8| 1|
|ID3| 8| 3| colB| 3| 2|
|ID1| 3| 5| colB| 5| 1|
|ID1| 3| 5| colA| 3| 2|
+---+----+----+--------+---------+----+
#Finally Filter rank = 1 as max value have rank 1 because we ranked desc value
df.where("rank=1").show()
+---+----+----+--------+---------+----+
| ID|colA|colB|col_name|col_value|rank|
+---+----+----+--------+---------+----+
|ID2| 4| 12| colB| 12| 1|
|ID3| 8| 3| colA| 8| 1|
|ID1| 3| 5| colB| 5| 1|
+---+----+----+--------+---------+----+
其他选项是 -
在基础 df 上使用 UDF 并返回具有最大值的列名 在同一示例中,在将列名和值列而不是排名使用 group byID
设置为 max col_value
之后。然后加入之前的df。
【讨论】:
我们需要获取最大值,因此 rank = 1 不起作用? @Hardikguptarank=1
按我提供的降序排列。 ex 5 有 1 级,3 有 2 级
对于最终输出,在第一行,它将是 colB 对吗?
@Hardikgupta 更新了答案。问题是由于数据类型不匹配造成的。将此转换为整数类型F.col("max_val").getItem(1).cast(T.IntegerType()).alias("col_value")
并在顶部添加了对此的导入。
@Hardikgupta 如果这能解决您的问题,请接受答案。【参考方案3】:
您可以使用 RDD API 添加新列:
df.rdd.map(lambda r: r.asDict())\
.map(lambda r: Row(Max_col=max([i for i in r.items() if i[0] != 'ID'],
key=lambda kv: kv[1])[0], **r) )\
.toDF()
导致:
+---+-------+----+----+
| ID|Max_col|colA|colB|
+---+-------+----+----+
|ID1| colB| 3| 5|
|ID2| colB| 4| 12|
|ID3| colA| 8| 3|
+---+-------+----+----+
【讨论】:
【参考方案4】:您可以在每一行上使用UDF
进行逐行计算,并使用struct
将多列传递给udf。希望这会有所帮助。
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from operator import itemgetter
data = [(("ID1", 3, 5,78)), (("ID2", 4, 12,45)), (("ID3", 70, 3,67))]
df = spark.createDataFrame(data, ["ID", "colA", "colB","colC"])
df.show()
+---+----+----+----+
| ID|colA|colB|colC|
+---+----+----+----+
|ID1| 3| 5| 78|
|ID2| 4| 12| 45|
|ID3| 70| 3| 70|
+---+----+----+----+
cols = df.columns
# to get max of values in a row
maxcol = F.udf(lambda row: max(row), IntegerType())
maxDF = df.withColumn("maxval", maxcol(F.struct([df[x] for x in df.columns[1:]])))
maxDF.show()
+---+----+----+----+-------+
|ID |colA|colB|colC|Max_col|
+---+----+----+----+-------+
|ID1|3 |5 |78 |78 |
|ID2|4 |12 |45 |45 |
|ID3|70 |3 |67 |70 |
+---+----+----+----+-------+
# to get max of value & corresponding column name
schema=StructType([StructField('maxval',IntegerType()),StructField('maxval_colname',StringType())])
maxcol = F.udf(lambda row: max(row,key=itemgetter(0)), schema)
maxDF = df.withColumn('maxfield', maxcol(F.struct([F.struct(df[x],F.lit(x)) for x in df.columns[1:]]))).\
select(df.columns+['maxfield.maxval','maxfield.maxval_colname'])
+---+----+----+----+------+--------------+
| ID|colA|colB|colC|maxval|maxval_colname|
+---+----+----+----+------+--------------+
|ID1| 3 | 5 | 78 | 78 | colC |
|ID2| 4 | 12 | 45 | 45 | colC |
|ID3| 70 | 3 | 67 | 68 | colA |
+---+----+----+----+------+--------------+
【讨论】:
但是如果你想要列名呢? 意思是,你需要最大值的列名。? 确实如其他答案所述。 您可以在结构中创建值、列 (df[x],x) 的元组并在其上获取最大值。【参考方案5】:扩展 Suresh 所做的工作......返回适当的列名
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, StringType
import numpy as np
data = [(("ID1", 3, 5,78)), (("ID2", 4, 12,45)), (("ID3", 68, 3,67))]
df = spark.createDataFrame(data, ["ID", "colA", "colB","colC"])
df.show()
cols = df.columns
maxcol = f.udf(lambda row: cols[row.index(max(row)) +1], StringType())
maxDF = df.withColumn("Max_col", maxcol(f.struct([df[x] for x in df.columns[1:]])))
maxDF.show(truncate=False)
+---+----+----+----+------+
|ID |colA|colB|colC|Max_col|
+---+----+----+----+------+
|ID1|3 |5 |78 |colC |
|ID2|4 |12 |45 |colC |
|ID3|68 |3 |67 |colA |
+---+----+----+----+------+
【讨论】:
以上是关于为 PySpark 中的最大值选择每行的列名的主要内容,如果未能解决你的问题,请参考以下文章
UDF 的性能改进 - 在 pyspark 中获取每行最小值的列名