为啥即使使用限制命令访问结果,SPARK \PYSPARK 也会计算所有内容?

Posted

技术标签:

【中文标题】为啥即使使用限制命令访问结果,SPARK \\PYSPARK 也会计算所有内容?【英文标题】:Why does SPARK \PYSPARK compute everything even if result is accessed with limit command?为什么即使使用限制命令访问结果,SPARK \PYSPARK 也会计算所有内容? 【发布时间】:2016-06-01 09:03:12 【问题描述】:

我有 mysql 表存储在 dataframe_mysql 中

dataframe_mysql = sqlContext.read.format("jdbc").options(...
dataframe_mysql.registerTempTable('dataf')
groupedtbl=sqlContext.sql("""SELECT job_seq_id,first(job_dcr_content) as firststage,last(job_dcr_content) as laststage,
                          first(from_stage) as source, last(from_stage) as target , count(jid) as noofstages from dataf group by job_seq_id having count(jid)>1""" )



from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
func1 = udf(fu1, StringType())
func2= udf(fu2, StringType())
res1=groupedtbl.withColumn('dcol',func1(groupedtbl.firststage,groupedtbl.lastage,groupedtbl.job_seq_id))
res2=res1.withColumn('lcol',func2(res1.dcol,res1.job_seq_id))

对于上面的代码,我看到即使我发出了一个限制命令:

lb=res2.limit(2).collect()

或以下命令仅获取一条记录的结果:

labels.filter(res2.job_seq_id==5843064)

它不仅在第一个查询中获得两个结果或在第二个查询中获得单个结果,它还对其他行进行了大量不必要的计算,即使只需要两行,也会浪费时间。我可以从内部日志中看到这一点,即使只是获取两行,其计算 100 行,然后从中检索两个结果行。我虽然 DAG 机制应该可以处理这个问题,但它似乎没有,我在这个观察中错了吗?

【问题讨论】:

【参考方案1】:

这里有很多不同的问题。有些与您使用的数据源有关,与查询有关,最后还有一些是通过使用带有 Spark

一步一步:

对于 JDBC 数据源,只有简单的谓词被下推到源。它仅表示主要 WHERE 子句内的条件(不在 HAVING 或其他计算字段内)。包括聚合和限制在内的其他一切都发生在 Spark 内部(请参阅Does spark predicate pushdown work with JDBC?、More than one hour to execute pyspark.sql.DataFrame.take(4))。

没有显式分区 Spark 没有关于数据分布的先验知识。这意味着任何需要数据聚合的代码都必须访问所有记录。因此,带有聚合的limit 子句只能在执行计划中聚合后才存在。这意味着:

res2.limit(2)

无法优化。

Spark

from pyspark.sql.functions import col, udf

options = ...
df = sqlContext.read.format("jdbc").options(**options).load()

df.printSchema()
## root
##  |-- x: integer (nullable = true)
##  |-- y: integer (nullable = true)

如果没有BatchPythonEvaluation,可以看到谓词被下推

df.groupBy("x").sum("y").where(col("x") > 2).explain()
## == Physical Plan ==
## TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Final,isDistinct=false)], output=[x#182,sum(y)#192L])
## +- TungstenExchange hashpartitioning(x#182,200), None
##    +- TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Partial,isDistinct=false)], output=[x#182,sum#197L])
##       +- Filter (x#182 > 2)
##          +- Scan JDBCRelation(...,point,[Lorg.apache.spark.Partition;@61ee5d1a,...)[x#182,y#183] PushedFilters: [GreaterThan(x,2)]

但不是在添加UDF调用时,即使没有使用输出

identity = udf(lambda x: x)

df.groupBy("x").sum("y").withColumn("foo", identity(col("x"))).where(col("x") > 2).explain()
== Physical Plan ==
## Project [x#182,sum(y)#214L,pythonUDF#216 AS foo#215]
## +- Filter (x#182 > 2)
##    +- !BatchPythonEvaluation PythonUDF#<lambda>(x#182), [x#182,sum(y)#214L,pythonUDF#216]
##       +- TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Final,isDistinct=false)], output=[x#182,sum(y)#214L])
##          +- TungstenExchange hashpartitioning(x#182,200), None
##             +- TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Partial,isDistinct=false)], output=[x#182,sum#219L])
##                +- Scan JDBCRelation(...,point,[Lorg.apache.spark.Partition;@61ee5d1a,...)[x#182,y#183]

此行为已在 Spark 2.0.0 中进行了优化。

【讨论】:

谢谢,能否请您回答相关问题:***.com/questions/37584185/… @stackit 如果我建议您首先开始accepting the answers :) 您有很多未解决的问题,这些问题的答案很好。关于链接的问题,我不清楚您要达到什么目的。 哦,我已经接受了,您可以在那里评论我需要解释的更多内容,谢谢!

以上是关于为啥即使使用限制命令访问结果,SPARK \PYSPARK 也会计算所有内容?的主要内容,如果未能解决你的问题,请参考以下文章

Spark DAG 与“withColumn”和“select”不同

为啥我的 laravel 集合返回 500 错误,即使不是在我使用 dd() 检查结果时?

为啥'get_json_object'在spark和sql工具中运行时返回不同的结果

为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]

为啥即使使用 setlocal enabledelayedexpansion 也无法访问 for 循环(批处理文件)中的局部变量? [复制]

即使以管理员身份运行,在 Windows 错误 5(拒绝访问)上运行 Spark