使用 sparksql 和 spark dataframe 我们如何根据一行中的最小值找到 COLUMN NAME

Posted

技术标签:

【中文标题】使用 sparksql 和 spark dataframe 我们如何根据一行中的最小值找到 COLUMN NAME【英文标题】:using sparksql and spark dataframe How can we find the COLUMN NAME based on the minimum value in a row 【发布时间】:2018-11-15 06:44:29 【问题描述】:

我有一个数据框 df 。它有 4 列

+-------+-------+-------+-------+  
| dist1 | dist2 | dist3 | dist4 |
+-------+-------+-------+-------+  
|  42   |  53   |  24   |  17   |
+-------+-------+-------+-------+  

我想要的输出是

dist4

看起来很简单,但我没有找到使用数据框或 sparksql 查询的任何合适的解决方案

【问题讨论】:

【参考方案1】:

你可以使用least函数作为

select least(dist1,dist2,dist3,dist4) as min_dist
  from yourTable;

对于相反的情况,可以使用greatest

编辑: 要检测列名,可以使用以下方法获取行

select inline(array(struct(42, 'dist1'), struct(53, 'dist2'), 
                    struct(24, 'dist3'), struct(17, 'dist4') ))

42  dist1
53  dist2
24  dist3
17  dist4 

然后可以应用min函数得到dist4

【讨论】:

@Barbaros Özhan。我希望列名具有最小值。 minimum() 用于查找最小值,我想要列名【参考方案2】:
Try this,

df.show
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  4|  3|  1|
+---+---+---+---+

val temp_df = df.columns.foldLeft(df)  (acc: DataFrame, colName: String) => acc.withColumn(colName, concat(col(colName), lit(","+colName)))

val minval = udf((ar: Seq[String]) => ar.min.split(",")(1))

val result = temp_df.withColumn("least", split(concat_ws(":",x.columns.map(col(_)):_*),":")).withColumn("least_col", minval(col("least")))

result.show
+---+---+---+---+--------------------+---------+
|  A|  B|  C|  D|               least|least_col|
+---+---+---+---+--------------------+---------+
|1,A|2,B|3,C|4,D|[1,A, 2,B, 3,C, 4,D]|        A|
|5,A|4,B|3,C|1,D|[5,A, 4,B, 3,C, 1,D]|        D|
+---+---+---+---+--------------------+---------+

【讨论】:

【参考方案3】:

RDD 方式并且没有 udf()s。

scala> val df = Seq((1,2,3,4),(5,4,3,1)).toDF("A","B","C","D")
df: org.apache.spark.sql.DataFrame = [A: int, B: int ... 2 more fields]

scala> val df2 = df.withColumn("arr", array(df.columns.map(col(_)):_*))
df2: org.apache.spark.sql.DataFrame = [A: int, B: int ... 3 more fields]

scala>  val rowarr = df.columns
rowarr: Array[String] = Array(A, B, C, D)

scala> val rdd1 = df2.rdd.map( x=> val p = x.getAs[WrappedArray[Int]]("arr").toArray; val q=rowarr(p.indexWhere(_==p.min));Row.merge(x,Row(q)) )
rdd1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[83] at map at <console>:47

scala> spark.createDataFrame(rdd1,df2.schema.add(StructField("mincol",StringType))).show
+---+---+---+---+------------+------+
|  A|  B|  C|  D|         arr|mincol|
+---+---+---+---+------------+------+
|  1|  2|  3|  4|[1, 2, 3, 4]|     A|
|  5|  4|  3|  1|[5, 4, 3, 1]|     D|
+---+---+---+---+------------+------+


scala>

【讨论】:

【参考方案4】:

你可以做类似的事情,

import org.apache.spark.sql.functions._

val cols = df.columns
val u1 = udf((s: Seq[Int]) => cols(s.zipWithIndex.min._2))

df.withColumn("res", u1(array("*")))

【讨论】:

我有一个疑问。我已经以其他方式完成了它,例如 df.first.toSeq.asInstanceOf[Seq[DoubleType]] 之后我使用了 if 语句进行基于 index 的比较。它给了我结果,但哪一个是更快和最好的解决方案?【参考方案5】:

您可以访问行模式,从那里检索名称列表并按名称访问行值,然后以这种方式计算出来。

见:https://spark.apache.org/docs/2.3.2/api/scala/index.html#org.apache.spark.sql.Row

大概是这个样子

dataframe.map(
    row => 
        val schema = row.schema
        val fieldNames:List[String] =  ??? //extract names from schema
        fieldNames.foldLeft(("", 0))(???) // retrieve field value using it's name and retain maximum
    
)

这将产生一个Dataset[String]

【讨论】:

以上是关于使用 sparksql 和 spark dataframe 我们如何根据一行中的最小值找到 COLUMN NAME的主要内容,如果未能解决你的问题,请参考以下文章

Spark:在 SparkSql 中使用 map 和 reduce

使用 Java 的 Spark 和 Spark SQL 新手

大数据学习笔记:SparkSQL入门

大数据学习笔记:SparkSQL入门

SparkSQL详解

Apache Spark:SparkSQL的使用