spark是不是优化了pyspark中相同但独立的DAG?

Posted

技术标签:

【中文标题】spark是不是优化了pyspark中相同但独立的DAG?【英文标题】:Does spark optimize identical but independent DAGs in pyspark?spark是否优化了pyspark中相同但独立的DAG? 【发布时间】:2019-03-13 17:57:53 【问题描述】:

考虑以下 pyspark 代码

def transformed_data(spark):
    df = spark.read.json('data.json')
    df = expensive_transformation(df)  # (A)    
    return df


df1 = transformed_data(spark)
df = transformed_data(spark)

df1 = foo_transform(df1)
df = bar_transform(df)

return df.join(df1)

我的问题是:transformed_data 上定义为 (A) 的操作是否在 final_view 中进行了优化,从而只执行一次?

请注意,此代码不等同于

df1 = transformed_data(spark)
df = df1

df1 = foo_transform(df1)
df = bar_transform(df)

df.join(df1)

(至少从 Python 的角度来看,在这种情况下是 id(df1) = id(df)

更广泛的问题是:在优化两个相等的 DAG 时,Spark 会考虑什么:DAG(由它们的边和节点定义)是否相等,或者它们的对象 ID (df = df1) 是否相等?

【问题讨论】:

P.S.这个例子可以不用这样的 fork+join 来写,但重点是优化。 【参考方案1】:

有点。它依赖于 Spark 有足够的信息来推断依赖关系。

例如,我按照描述复制了您的示例:

from pyspark.sql.functions import hash
def f(spark, filename):
    df=spark.read.csv(filename)
    df2=df.select(hash('_c1').alias('hashc2'))
    df3=df2.select(hash('hashc2').alias('hashc3'))
    df4=df3.select(hash('hashc3').alias('hashc4'))
    return df4

filename = 'some-valid-file.csv'
df_a = f(spark, filename)
df_b = f(spark, filename)
assert df_a != df_b

df_joined = df_a.join(df_b, df_a.hashc4==df_b.hashc4, how='left')

如果我使用 df_joined.explain(extended=True) 解释这个结果数据框,我会看到以下四个计划:

== Parsed Logical Plan ==
Join LeftOuter, (hashc4#20 = hashc4#42)
:- Project [hash(hashc3#18, 42) AS hashc4#20]
:  +- Project [hash(hashc2#16, 42) AS hashc3#18]
:     +- Project [hash(_c1#11, 42) AS hashc2#16]
:        +- Relation[_c0#10,_c1#11,_c2#12] csv
+- Project [hash(hashc3#40, 42) AS hashc4#42]
   +- Project [hash(hashc2#38, 42) AS hashc3#40]
      +- Project [hash(_c1#33, 42) AS hashc2#38]
         +- Relation[_c0#32,_c1#33,_c2#34] csv
== Analyzed Logical Plan ==
hashc4: int, hashc4: int
Join LeftOuter, (hashc4#20 = hashc4#42)
:- Project [hash(hashc3#18, 42) AS hashc4#20]
:  +- Project [hash(hashc2#16, 42) AS hashc3#18]
:     +- Project [hash(_c1#11, 42) AS hashc2#16]
:        +- Relation[_c0#10,_c1#11,_c2#12] csv
+- Project [hash(hashc3#40, 42) AS hashc4#42]
   +- Project [hash(hashc2#38, 42) AS hashc3#40]
      +- Project [hash(_c1#33, 42) AS hashc2#38]
         +- Relation[_c0#32,_c1#33,_c2#34] csv
== Optimized Logical Plan ==
Join LeftOuter, (hashc4#20 = hashc4#42)
:- Project [hash(hash(hash(_c1#11, 42), 42), 42) AS hashc4#20]
:  +- Relation[_c0#10,_c1#11,_c2#12] csv
+- Project [hash(hash(hash(_c1#33, 42), 42), 42) AS hashc4#42]
   +- Relation[_c0#32,_c1#33,_c2#34] csv
== Physical Plan ==
SortMergeJoin [hashc4#20], [hashc4#42], LeftOuter
:- *(2) Sort [hashc4#20 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(hashc4#20, 200)
:     +- *(1) Project [hash(hash(hash(_c1#11, 42), 42), 42) AS hashc4#20]
:        +- *(1) FileScan csv [_c1#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file: some-valid-file.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c1:string>
+- *(4) Sort [hashc4#42 ASC NULLS FIRST], false, 0
   +- ReusedExchange [hashc4#42], Exchange hashpartitioning(hashc4#20, 200)

上面的 physical plan 只读取 CSV 一次并重新使用所有计算,因为 Spark 检测到两个 FileScans 是相同的(即 Spark 知道它们不是独立的)。

现在考虑我是否将 read.csv 替换为手工制作的独立但相同的 RDD。

from pyspark.sql.functions import hash
def g(spark):
    df=spark.createDataFrame([('a', 'a'), ('b', 'b'), ('c', 'c')], ["_c1", "_c2"])
    df2=df.select(hash('_c1').alias('hashc2'))
    df3=df2.select(hash('hashc2').alias('hashc3'))
    df4=df3.select(hash('hashc3').alias('hashc4'))
    return df4

df_c = g(spark)
df_d = g(spark)
df_joined = df_c.join(df_d, df_c.hashc4==df_d.hashc4, how='left')

在这种情况下,Spark 的物理计划会扫描两个不同的 RDD。这是运行df_joined.explain(extended=True) 确认的输出。

== Parsed Logical Plan ==
Join LeftOuter, (hashc4#8 = hashc4#18)
:- Project [hash(hashc3#6, 42) AS hashc4#8]
:  +- Project [hash(hashc2#4, 42) AS hashc3#6]
:     +- Project [hash(_c1#0, 42) AS hashc2#4]
:        +- LogicalRDD [_c1#0, _c2#1], false
+- Project [hash(hashc3#16, 42) AS hashc4#18]
   +- Project [hash(hashc2#14, 42) AS hashc3#16]
      +- Project [hash(_c1#10, 42) AS hashc2#14]
         +- LogicalRDD [_c1#10, _c2#11], false

== Analyzed Logical Plan ==
hashc4: int, hashc4: int
Join LeftOuter, (hashc4#8 = hashc4#18)
:- Project [hash(hashc3#6, 42) AS hashc4#8]
:  +- Project [hash(hashc2#4, 42) AS hashc3#6]
:     +- Project [hash(_c1#0, 42) AS hashc2#4]
:        +- LogicalRDD [_c1#0, _c2#1], false
+- Project [hash(hashc3#16, 42) AS hashc4#18]
   +- Project [hash(hashc2#14, 42) AS hashc3#16]
      +- Project [hash(_c1#10, 42) AS hashc2#14]
         +- LogicalRDD [_c1#10, _c2#11], false

== Optimized Logical Plan ==
Join LeftOuter, (hashc4#8 = hashc4#18)
:- Project [hash(hash(hash(_c1#0, 42), 42), 42) AS hashc4#8]
:  +- LogicalRDD [_c1#0, _c2#1], false
+- Project [hash(hash(hash(_c1#10, 42), 42), 42) AS hashc4#18]
   +- LogicalRDD [_c1#10, _c2#11], false

== Physical Plan ==
SortMergeJoin [hashc4#8], [hashc4#18], LeftOuter
:- *(2) Sort [hashc4#8 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(hashc4#8, 200)
:     +- *(1) Project [hash(hash(hash(_c1#0, 42), 42), 42) AS hashc4#8]
:        +- Scan ExistingRDD[_c1#0,_c2#1]
+- *(4) Sort [hashc4#18 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(hashc4#18, 200)
      +- *(3) Project [hash(hash(hash(_c1#10, 42), 42), 42) AS hashc4#18]
         +- Scan ExistingRDD[_c1#10,_c2#11]

这并不是 PySpark 特有的行为。

【讨论】:

所以,如果我理解您的回答:当从 X 读取源代码时,它确实会优化。当源是动态创建时,它不会。换句话说,它使用“DAG(由它们的边和节点定义)是否相等” 对。 Spark DAG 具有边(操作)和节点(RDD)。如果两个 DAG 不是从一个公共节点 (RDD) 派生的,那么即使它们“相同”,它们也无法被优化。在第一种情况下,源 RDD 是使用相同的 FileScan 派生的,并且重复的操作被优化掉了。在第二种情况下,Spark 不知道可以将两个 RDD 视为一个。

以上是关于spark是不是优化了pyspark中相同但独立的DAG?的主要内容,如果未能解决你的问题,请参考以下文章

没有 spark-submit 的 Exec pyspark 独立脚本

在 pyspark 中处理大数据的优化

Spark 和 PySpark 之间是不是存在功能奇偶校验

PySpark - 获取具有相同值的数组元素的数量

独立集群 + Docker 上的 PySpark 性能不佳

安装 Spark 问题。无法使用 pyspark 打开 IPython Notebook