在Scala spark中实现动态字符串插值?
Posted
技术标签:
【中文标题】在Scala spark中实现动态字符串插值?【英文标题】:Achive dynamic string interpolation in Scala spark? 【发布时间】:2020-01-19 10:03:05 【问题描述】:我有一个字符串,其中包含需要进入我预期数据帧的.agg
函数的函数。
我的数据数据框看起来像
val client = Seq((1,"A","D",10),(2,"A","D",5),(3,"B","C",56),(5,"B","D",67)).toDF("ID","Categ","subCat","Amnt")
+---+-----+------+----+
| ID|Categ|subCat|Amnt|
+---+-----+------+----+
| 1| A| D| 10|
| 2| A| D| 5|
| 3| B| C| 56|
| 5| B| D| 67|
+---+-----+------+----+
所以我试图插入这个刺痛
val str= "s"$count(ID) as Total,$sum(Amnt) as amt""
我想实现这个作为输出
client.groupBy("Categ","subCat").agg(sum("Amnt") as "amt",count("ID") as "Total").show()
+-----+------+---+-----+
|Categ|subCat|amt|Total|
+-----+------+---+-----+
| B| C| 56| 1|
| A| D| 15| 2|
| B| D| 67| 1|
+-----+------+---+-----+
我试过了
client.groupBy("Categ","subCat").agg(s"$str").show()
出现错误
> error: overloaded method value agg with alternatives:
(expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame (exprs: java.util.Map[String,String])org.apache.spark.sql.DataFrame (表达式: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame (aggExpr: (String, String),aggExprs: (String, String)*)org.apache.spark.sql.DataFrame 不能应用于(String)
我也试过expr
val str="sum(Amnt) as amt"
client.groupBy("Categ","subCat").agg(expr(str)).show()
this return the desired outcome
+-----+------+---+
|Categ|subCat|amt|
+-----+------+---+
| B| C| 56|
| A| D| 15|
| B| D| 67|
+-----+------+---+
但是当我再次尝试时
val str="sum(Amnt) as amt,count(ID) as ID_tot"
client.groupBy("Categ","subCat").agg(expr(str)).show()
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input ',' expecting <EOF>(line 1, pos 16)
【问题讨论】:
您正在尝试在此处混合使用 Spark SQL 和 Dataframe API。这是不可能的。如果您提到的要求严格要求字符串插值,那么您必须选择纯 Spark SQL 解决方案,即select count(ID) as Total, sum(Amnt) as amt from client group by Categ ,subCat
@Alexandros 我认为这是我的备用计划。
如您所见,here agg
有 3 个重载 1. agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
2. agg(exprs: Map[String, String]): DataFrame
3. agg(expr: Column, exprs: Column*): DataFrame
它们都不接受字符串。所以你可以像上面显示的那样做df.groupBy("Categ","subCat").agg("Amnt" -> "sum", "ID" -> "count")
或df.groupBy("Categ","subCat").agg(Map("Amnt" -> "sum", "ID" -> "count"))
或SQL
【参考方案1】:
你可以使用不同的API来实现你想要的
import org.apache.spark.sql.functions._
client
.groupBy("Categ", "subCat")
.agg(
sum("Amnt").as("Amnt"),
count(lit("1")).as("Total"))
.show()
或者,使用完整的 SparkSQL,但如果您尝试注入用户的输入,我不鼓励这种模式。
spark.sql(s"select Categ, subCat, $str from client group by Categ, subCat")
【讨论】:
聚合函数是字符串本身的一部分 val str= "s"$count(ID) as Total,$sum(Amnt) as amt"" @KalpishSinghal 好吧,没有注意到你在这里尝试做什么。我已经编辑了我的答案 你是对的,这样做我会实现我想要的这份工作,但我希望这是一个可配置的字符串,所以而不是“ sum("Amt") ,Count("ID") ”我可以在这个函数中传递一个字符串 2 说“Avg("Amnt").as("avg_amt")”这就是我正在寻找插值的地方【参考方案2】:有点粗略的解决方案:除以,
并分别调用expr
:
val str="sum(Amnt) as amt,count(ID) as ID_tot"
val (first, rest) = str.split(",").map(expr).splitAt(1)
client.groupBy("Categ","subCat").agg(first, rest: _*)
如果,
可以是表达式的一部分(例如,在字符串文字中),情况会变得更糟:尝试使用expr
解析它,捕获ParseException
并查看它的结束位置?确实应该有更直接的方法,但我不知道。
【讨论】:
【参考方案3】:你可以通过使用字符串列表作为表达式来达到同样的效果-
val str=List("sum(Amnt) as amt,count(ID) as ID_tot")
ds.selectExpr(str:_*)
【讨论】:
以上是关于在Scala spark中实现动态字符串插值?的主要内容,如果未能解决你的问题,请参考以下文章
Scala - 如何在 Spark 的 map 函数中实现 Try