spark sql中需要一次传递多个sql查询
Posted
技术标签:
【中文标题】spark sql中需要一次传递多个sql查询【英文标题】:need to pass multiple sql query at a time in spark sql 【发布时间】:2020-07-06 18:55:51 【问题描述】:对于下面的输出,我想运行多个 sql 查询,如下面的代码所示,但是 spark 不支持多个 sql 语句,您能否建议一些其他解决方法,这将非常有帮助,谢谢:)
expected Output:-
Col_name Max_val Min_value
Name Null Null
Age 15 5
height 100 8
CODE :-
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
df = sc.parallelize([ \
Row(name='Alice', age=5, height=80), \
Row(name='Kate', age=10, height=90), \
Row(name='Brain', age=15, height=100)]).toDF()
df.createOrReplaceTempView("Test")
df3 = spark.sql("select max(name) as name ,max(age) as age,max(height) as height from Test" )
df4=df.selectExpr("stack(3,'name',bigint(name),'age',bigint(age),'height',bigint(height)) as (col_name,max_data)")
df5 = spark.sql("select min(name) as name ,min(age) as age,min(height) as height from Test" )
df6=df.selectExpr("stack(3,'name',bigint(name),'age',bigint(age),'height',bigint(height)) as (col_name,min_data)")
df7=df4.join(df6,['col_name'],'inner').groupBy("col_name").orderBy("col_name")
df7.show()
【问题讨论】:
【参考方案1】:如果您不需要结果查询的完全相同的结构,您可以在同一步骤中简单地进行多个聚合(这也会更有效):
from pyspark.sql import Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
df = sc.parallelize([ \
Row(name='Alice', age=5, height=80), \
Row(name='Kate', age=10, height=90), \
Row(name='Brain', age=15, height=100)]).toDF()
df2 = df.agg(
F.max(F.col("height")).alias("max_height"),
F.max(F.col("age")).alias("max_age"),
F.min(F.col("height")).alias("min_height"),
F.min(F.col("age")).alias("min_age")
)
df2.collect()
这给出了一个结果:[Row(max_height=100, max_age=15, min_height=80, min_age=5)]
要获得上述格式,您必须使用explode。
【讨论】:
【参考方案2】:在 Scala 中,您可以通过 Futures API 实现这一点。然后你可以公开你的 Scala 像这样的:
for(q <- queries)
Future
spark.sql(q)
.map(Await.result(_, Duration("+Inf"))
请注意,“+Inf”只是说明性的,不要使用 Inf,因为永远不会发生超时,您的代码可能会永远挂起。
这当然不支持 .show(),因为它会在 DataFrame 之上运行,在这里我假设查询是查询的集合。
然后您可以将其包装在 spark.ml.Transformer 中,并将查询列表作为参数传递。
然后您可以在 spark submit 时将您的 jar 传递给 pyspark。
最后你可以通过 spark._jvm 访问你的转换器。
这是一个很好的解决方法,我只是提出它,因为我知道这可行。
我能问一下为什么您的示例中的语句必须并行运行吗?这可能有助于找到更好的建议。
【讨论】:
以上是关于spark sql中需要一次传递多个sql查询的主要内容,如果未能解决你的问题,请参考以下文章
无法将变量传递给 pyspark 中的 spark sql 查询