spark sql中需要一次传递多个sql查询

Posted

技术标签:

【中文标题】spark sql中需要一次传递多个sql查询【英文标题】:need to pass multiple sql query at a time in spark sql 【发布时间】:2020-07-06 18:55:51 【问题描述】:

对于下面的输出,我想运行多个 sql 查询,如下面的代码所示,但是 spark 不支持多个 sql 语句,您能否建议一些其他解决方法,这将非常有帮助,谢谢:)

expected Output:-

Col_name    Max_val Min_value
Name          Null      Null
Age             15        5
height         100        8

CODE :-



  from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
df = sc.parallelize([ \
    Row(name='Alice', age=5, height=80), \
    Row(name='Kate', age=10, height=90), \
    Row(name='Brain', age=15, height=100)]).toDF()

df.createOrReplaceTempView("Test")

df3 = spark.sql("select max(name) as name ,max(age) as age,max(height) as height from Test" )
df4=df.selectExpr("stack(3,'name',bigint(name),'age',bigint(age),'height',bigint(height)) as (col_name,max_data)")
df5 = spark.sql("select min(name) as name ,min(age) as age,min(height) as height from Test" )
df6=df.selectExpr("stack(3,'name',bigint(name),'age',bigint(age),'height',bigint(height)) as (col_name,min_data)")
df7=df4.join(df6,['col_name'],'inner').groupBy("col_name").orderBy("col_name")
df7.show()

【问题讨论】:

【参考方案1】:

如果您不需要结果查询的完全相同的结构,您可以在同一步骤中简单地进行多个聚合(这也会更有效):

from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
df = sc.parallelize([ \
    Row(name='Alice', age=5, height=80), \
    Row(name='Kate', age=10, height=90), \
    Row(name='Brain', age=15, height=100)]).toDF()
df2 = df.agg(
    F.max(F.col("height")).alias("max_height"),
    F.max(F.col("age")).alias("max_age"),
    F.min(F.col("height")).alias("min_height"),
    F.min(F.col("age")).alias("min_age")
)
df2.collect()

这给出了一个结果:[Row(max_height=100, max_age=15, min_height=80, min_age=5)]

要获得上述格式,您必须使用explode。

【讨论】:

【参考方案2】:

在 Scala 中,您可以通过 Futures API 实现这一点。然后你可以公开你的 Scala 像这样的:

for(q <- queries) 
  Future 
    spark.sql(q)
  
.map(Await.result(_, Duration("+Inf")) 

请注意,“+Inf”只是说明性的,不要使用 Inf,因为永远不会发生超时,您的代码可能会永远挂起。

这当然不支持 .show(),因为它会在 DataFrame 之上运行,在这里我假设查询是查询的集合。

然后您可以将其包装在 spark.ml.Transformer 中,并将查询列表作为参数传递。

然后您可以在 spark submit 时将您的 jar 传递给 pyspark。

最后你可以通过 spark._jvm 访问你的转换器。

这是一个很好的解决方法,我只是提出它,因为我知道这可行。

我能问一下为什么您的示例中的语句必须并行运行吗?这可能有助于找到更好的建议。

【讨论】:

以上是关于spark sql中需要一次传递多个sql查询的主要内容,如果未能解决你的问题,请参考以下文章

无法将变量传递给 pyspark 中的 spark sql 查询

Spark sql 查询优化

Spark Sql 与 Spark Dataframe API

spark sql日期间隔sql查询不起作用

一次sparksql问题排查记录

SQL查询中的条件语句检查作为参数传递的多个值(列表)