spark将列值作为sql查询执行

Posted

技术标签:

【中文标题】spark将列值作为sql查询执行【英文标题】:spark execute column values as sql queries 【发布时间】:2021-04-06 01:50:57 【问题描述】:

我有一个数据框,其中一列具有查询字符串,表示要使用其他列完成的计算。

val aSeq = Seq(
(1,10,10,10,"avg(c2)"), 
(2,20,20,20,"sum(c1)"), 
(3,30,30,30,"count(id)"),
(4,40,40,40,"(avg(c1)+avg(c2))"), 
(5,50,50,50,"(avg(c3)+avg(c1))")
)

val df = aSeq.toDF("id","c1","c2","c3","calc")
df.show()

+---+---+---+---+--------------------+
| id| c1| c2| c3|                calc|
+---+---+---+---+--------------------+
|  1| 10| 10| 10|             avg(c2)|
|  2| 20| 20| 20|             sum(c1)|
|  3| 30| 30| 30|           count(id)|
|  4| 40| 40| 40|   (avg(c1)+avg(c2))|
|  5| 50| 50| 50|   (avg(c3)+avg(c1))|
+---+---+---+---+--------------------+

是否可以计算包含这些计算值的另一列?如果需要,我很乐意提供更多信息。感谢您的任何输入,指针...

【问题讨论】:

我不认为sum(avg(c1)+avg(c2)) 是一个有效的例子,因为嵌套的聚合函数。我相信你只想写(avg(c1)+avg(c2)) 【参考方案1】:

selectExpr 可用于评估数据集的 SQL 表达式:

df.select("calc").as[String].collect().foreach c =>
  val result = df.selectExpr(c).as[Double].head()
  println("%-17s --> %3.1f".format(c, result))

打印

avg(c2)           --> 30,0
sum(c1)           --> 150,0
count(id)         --> 5,0
(avg(c1)+avg(c2)) --> 60,0
(avg(c3)+avg(c1)) --> 60,0

为了加快计算速度,可以将calc 列收集到parallel collection 中。结果可能会以不同的顺序打印:

df.select("calc").as[String].collect().par.foreach c =>
 [same as above]

我理解calc 列中的操作应该对整个数据框进行操作,而不仅仅是对它所属的行进行操作的问题。

【讨论】:

谢谢。看来,对于大量行,collect() 可能会导致问题,但我会试一试。 @iamauser 要触发执行一条sql语句,你必须在驱动上有这条语句。您要执行多少(不同)语句?

以上是关于spark将列值作为sql查询执行的主要内容,如果未能解决你的问题,请参考以下文章

将列值显式设置为空 SQL Developer

将列值显式设置为null SQL Developer

SQL:在 Chartio 中动态地将列转换为行

SQL:将列值链接到列名以满足某些条件

如何仅将列值插入前一千行

SQL 将列值拆分为 Netezza 中的行