使用 sparksql 和 spark dataframe 我们如何根据一行中的最小值找到 COLUMN NAME
Posted
技术标签:
【中文标题】使用 sparksql 和 spark dataframe 我们如何根据一行中的最小值找到 COLUMN NAME【英文标题】:using sparksql and spark dataframe How can we find the COLUMN NAME based on the minimum value in a row 【发布时间】:2018-11-15 06:44:29 【问题描述】:我有一个数据框 df 。它有 4 列
+-------+-------+-------+-------+
| dist1 | dist2 | dist3 | dist4 |
+-------+-------+-------+-------+
| 42 | 53 | 24 | 17 |
+-------+-------+-------+-------+
我想要的输出是
dist4
看起来很简单,但我没有找到使用数据框或 sparksql 查询的任何合适的解决方案
【问题讨论】:
【参考方案1】:你可以使用least
函数作为
select least(dist1,dist2,dist3,dist4) as min_dist
from yourTable;
对于相反的情况,可以使用greatest
。
编辑: 要检测列名,可以使用以下方法获取行
select inline(array(struct(42, 'dist1'), struct(53, 'dist2'),
struct(24, 'dist3'), struct(17, 'dist4') ))
42 dist1
53 dist2
24 dist3
17 dist4
然后可以应用min
函数得到dist4
【讨论】:
@Barbaros Özhan。我希望列名具有最小值。 minimum() 用于查找最小值,我想要列名【参考方案2】:Try this,
df.show
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| 1| 2| 3| 4|
| 5| 4| 3| 1|
+---+---+---+---+
val temp_df = df.columns.foldLeft(df) (acc: DataFrame, colName: String) => acc.withColumn(colName, concat(col(colName), lit(","+colName)))
val minval = udf((ar: Seq[String]) => ar.min.split(",")(1))
val result = temp_df.withColumn("least", split(concat_ws(":",x.columns.map(col(_)):_*),":")).withColumn("least_col", minval(col("least")))
result.show
+---+---+---+---+--------------------+---------+
| A| B| C| D| least|least_col|
+---+---+---+---+--------------------+---------+
|1,A|2,B|3,C|4,D|[1,A, 2,B, 3,C, 4,D]| A|
|5,A|4,B|3,C|1,D|[5,A, 4,B, 3,C, 1,D]| D|
+---+---+---+---+--------------------+---------+
【讨论】:
【参考方案3】:RDD 方式并且没有 udf()s。
scala> val df = Seq((1,2,3,4),(5,4,3,1)).toDF("A","B","C","D")
df: org.apache.spark.sql.DataFrame = [A: int, B: int ... 2 more fields]
scala> val df2 = df.withColumn("arr", array(df.columns.map(col(_)):_*))
df2: org.apache.spark.sql.DataFrame = [A: int, B: int ... 3 more fields]
scala> val rowarr = df.columns
rowarr: Array[String] = Array(A, B, C, D)
scala> val rdd1 = df2.rdd.map( x=> val p = x.getAs[WrappedArray[Int]]("arr").toArray; val q=rowarr(p.indexWhere(_==p.min));Row.merge(x,Row(q)) )
rdd1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[83] at map at <console>:47
scala> spark.createDataFrame(rdd1,df2.schema.add(StructField("mincol",StringType))).show
+---+---+---+---+------------+------+
| A| B| C| D| arr|mincol|
+---+---+---+---+------------+------+
| 1| 2| 3| 4|[1, 2, 3, 4]| A|
| 5| 4| 3| 1|[5, 4, 3, 1]| D|
+---+---+---+---+------------+------+
scala>
【讨论】:
【参考方案4】:你可以做类似的事情,
import org.apache.spark.sql.functions._
val cols = df.columns
val u1 = udf((s: Seq[Int]) => cols(s.zipWithIndex.min._2))
df.withColumn("res", u1(array("*")))
【讨论】:
我有一个疑问。我已经以其他方式完成了它,例如 df.first.toSeq.asInstanceOf[Seq[DoubleType]] 之后我使用了 if 语句进行基于 index 的比较。它给了我结果,但哪一个是更快和最好的解决方案?【参考方案5】:您可以访问行模式,从那里检索名称列表并按名称访问行值,然后以这种方式计算出来。
见:https://spark.apache.org/docs/2.3.2/api/scala/index.html#org.apache.spark.sql.Row
大概是这个样子
dataframe.map(
row =>
val schema = row.schema
val fieldNames:List[String] = ??? //extract names from schema
fieldNames.foldLeft(("", 0))(???) // retrieve field value using it's name and retain maximum
)
这将产生一个Dataset[String]
【讨论】:
以上是关于使用 sparksql 和 spark dataframe 我们如何根据一行中的最小值找到 COLUMN NAME的主要内容,如果未能解决你的问题,请参考以下文章
Spark:在 SparkSql 中使用 map 和 reduce