spark将列值作为sql查询执行
Posted
技术标签:
【中文标题】spark将列值作为sql查询执行【英文标题】:spark execute column values as sql queries 【发布时间】:2021-04-06 01:50:57 【问题描述】:我有一个数据框,其中一列具有查询字符串,表示要使用其他列完成的计算。
val aSeq = Seq(
(1,10,10,10,"avg(c2)"),
(2,20,20,20,"sum(c1)"),
(3,30,30,30,"count(id)"),
(4,40,40,40,"(avg(c1)+avg(c2))"),
(5,50,50,50,"(avg(c3)+avg(c1))")
)
val df = aSeq.toDF("id","c1","c2","c3","calc")
df.show()
+---+---+---+---+--------------------+
| id| c1| c2| c3| calc|
+---+---+---+---+--------------------+
| 1| 10| 10| 10| avg(c2)|
| 2| 20| 20| 20| sum(c1)|
| 3| 30| 30| 30| count(id)|
| 4| 40| 40| 40| (avg(c1)+avg(c2))|
| 5| 50| 50| 50| (avg(c3)+avg(c1))|
+---+---+---+---+--------------------+
是否可以计算包含这些计算值的另一列?如果需要,我很乐意提供更多信息。感谢您的任何输入,指针...
【问题讨论】:
我不认为sum(avg(c1)+avg(c2))
是一个有效的例子,因为嵌套的聚合函数。我相信你只想写(avg(c1)+avg(c2))
【参考方案1】:
selectExpr 可用于评估数据集的 SQL 表达式:
df.select("calc").as[String].collect().foreach c =>
val result = df.selectExpr(c).as[Double].head()
println("%-17s --> %3.1f".format(c, result))
打印
avg(c2) --> 30,0
sum(c1) --> 150,0
count(id) --> 5,0
(avg(c1)+avg(c2)) --> 60,0
(avg(c3)+avg(c1)) --> 60,0
为了加快计算速度,可以将calc
列收集到parallel collection 中。结果可能会以不同的顺序打印:
df.select("calc").as[String].collect().par.foreach c =>
[same as above]
我理解calc
列中的操作应该对整个数据框进行操作,而不仅仅是对它所属的行进行操作的问题。
【讨论】:
谢谢。看来,对于大量行,collect()
可能会导致问题,但我会试一试。
@iamauser 要触发执行一条sql语句,你必须在驱动上有这条语句。您要执行多少(不同)语句?以上是关于spark将列值作为sql查询执行的主要内容,如果未能解决你的问题,请参考以下文章