在Scala spark中实现动态字符串插值?

Posted

技术标签:

【中文标题】在Scala spark中实现动态字符串插值?【英文标题】:Achive dynamic string interpolation in Scala spark? 【发布时间】:2020-01-19 10:03:05 【问题描述】:

我有一个字符串,其中包含需要进入我预期数据帧的.agg 函数的函数。 我的数据数据框看起来像

val client = Seq((1,"A","D",10),(2,"A","D",5),(3,"B","C",56),(5,"B","D",67)).toDF("ID","Categ","subCat","Amnt")
+---+-----+------+----+
| ID|Categ|subCat|Amnt|
+---+-----+------+----+
|  1|    A|     D|  10|
|  2|    A|     D|   5|
|  3|    B|     C|  56|
|  5|    B|     D|  67|
+---+-----+------+----+

所以我试图插入这个刺痛

val str= "s"$count(ID) as Total,$sum(Amnt) as amt""

我想实现这个作为输出

client.groupBy("Categ","subCat").agg(sum("Amnt") as "amt",count("ID") as "Total").show()
+-----+------+---+-----+
|Categ|subCat|amt|Total|
+-----+------+---+-----+
|    B|     C| 56|    1|
|    A|     D| 15|    2|
|    B|     D| 67|    1|
+-----+------+---+-----+

我试过了

 client.groupBy("Categ","subCat").agg(s"$str").show()

出现错误

> error: overloaded method value agg with alternatives:  

(expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame (exprs: java.util.Map[String,String])org.apache.spark.sql.DataFrame (表达式: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame (aggExpr: (String, String),aggExprs: (String, String)*)org.apache.spark.sql.DataFrame 不能应用于(String)

我也试过expr

    val str="sum(Amnt) as amt"
    client.groupBy("Categ","subCat").agg(expr(str)).show()
 this return the desired outcome
    +-----+------+---+
    |Categ|subCat|amt|
    +-----+------+---+
    |    B|     C| 56|
    |    A|     D| 15|
    |    B|     D| 67|
    +-----+------+---+

但是当我再次尝试时 val str="sum(Amnt) as amt,count(ID) as ID_tot"

    client.groupBy("Categ","subCat").agg(expr(str)).show()
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input ',' expecting <EOF>(line 1, pos 16)

【问题讨论】:

您正在尝试在此处混合使用 Spark SQL 和 Dataframe API。这是不可能的。如果您提到的要求严格要求字符串插值,那么您必须选择纯 Spark SQL 解决方案,即select count(ID) as Total, sum(Amnt) as amt from client group by Categ ,subCat @Alexandros 我认为这是我的备用计划。 如您所见,here agg 有 3 个重载 1. agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame 2. agg(exprs: Map[String, String]): DataFrame 3. agg(expr: Column, exprs: Column*): DataFrame 它们都不接受字符串。所以你可以像上面显示的那样做df.groupBy("Categ","subCat").agg("Amnt" -&gt; "sum", "ID" -&gt; "count")df.groupBy("Categ","subCat").agg(Map("Amnt" -&gt; "sum", "ID" -&gt; "count"))或SQL 【参考方案1】:

你可以使用不同的API来实现你想要的

import org.apache.spark.sql.functions._
client
  .groupBy("Categ", "subCat")
  .agg(
    sum("Amnt").as("Amnt"), 
    count(lit("1")).as("Total"))
  .show()

或者,使用完整的 SparkSQL,但如果您尝试注入用户的输入,我不鼓励这种模式。

spark.sql(s"select Categ, subCat, $str from client group by Categ, subCat")

【讨论】:

聚合函数是字符串本身的一部分 val str= "s"$count(ID) as Total,$sum(Amnt) as amt"" @KalpishSinghal 好吧,没有注意到你在这里尝试做什么。我已经编辑了我的答案 你是对的,这样做我会实现我想要的这份工作,但我希望这是一个可配置的字符串,所以而不是“ sum("Amt") ,Count("ID") ”我可以在这个函数中传递一个字符串 2 说“Avg("Amnt").as("avg_amt")”这就是我正在寻找插值的地方【参考方案2】:

有点粗略的解决方案:除以, 并分别调用expr

val str="sum(Amnt) as amt,count(ID) as ID_tot"
val (first, rest) = str.split(",").map(expr).splitAt(1)
client.groupBy("Categ","subCat").agg(first, rest: _*)

如果, 可以是表达式的一部分(例如,在字符串文字中),情况会变得更糟:尝试使用expr 解析它,捕获ParseException 并查看它的结束位置?确实应该有更直接的方法,但我不知道。

【讨论】:

【参考方案3】:

你可以通过使用字符串列表作为表达式来达到同样的效果-

val str=List("sum(Amnt) as amt,count(ID) as ID_tot") ds.selectExpr(str:_*)

【讨论】:

以上是关于在Scala spark中实现动态字符串插值?的主要内容,如果未能解决你的问题,请参考以下文章

Scala - 如何在 Spark 的 map 函数中实现 Try

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

scala中的字符串插值

scala中的字符串插值

在 spark 中实现 informatica 逻辑

如何在 jOOQ 中使用 Scala 的字符串插值?