广播数据帧没有删除洗牌阶段(几行数据)[重复]
Posted
技术标签:
【中文标题】广播数据帧没有删除洗牌阶段(几行数据)[重复]【英文标题】:Broadcast dataframe is not removing shuffle phase (few rows of data) [duplicate] 【发布时间】:2020-01-29 03:37:03 【问题描述】:我正在使用 databricks 社区版笔记本学习 spark。我创建了几行的示例数据。 由于数据非常小,因此在查询计划中不应该有交换阶段。我也尝试过广播,但我仍然看到交换阶段。这些配置在 DB 社区版笔记本上不起作用吗?
import org.apache.spark.sql.functions.col,regexp_replace
val authorBook = sc.parallelize(Seq(("a1" , "b1") , ("a1","b2") , ("a2" , "b3"),("a3" , "b4")))
val schemaColumn = Seq("author","book")
val df = authorBook.toDF(schemaColumn:_*)
val bookSold = sc.parallelize(Seq(("b1",100) , ("b2" , 500) , ("b3" , 400) , ("b4" , 500)) )
val bookSchema = Seq("book" , "sold_copy")
val dfBook = bookSold.toDF(bookSchema:_*)
///val totalBookSold = df.repartition(4,col("book")).join(dfBook.repartition(4,col("book")) , "book")
sc.broadcast(dfBook)
val totalBookSold = df.join(dfBook , "book")
totalBookSold.explain(true)
有广播和无广播的查询计划相同
== Physical Plan ==
*(3) Project [book#698, author#697, sold_copy#708]
+- *(3) SortMergeJoin [book#698], [book#707], Inner
:- Sort [book#698 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(book#698, 200), [id=#2071]
: +- *(1) Project [_1#694 AS author#697, _2#695 AS book#698]
: +- *(1) Filter isnotnull(_2#695)
: +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#694, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#695]
: +- Scan[obj#693]
+- Sort [book#707 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(book#707, 200), [id=#2079]
+- *(2) Project [_1#704 AS book#707, _2#705 AS sold_copy#708]
+- *(2) Filter isnotnull(_1#704)
+- *(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#704, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#705]
+- Scan[obj#703]
这个链接解决了我的问题
Broadcast not happening while joining dataframes in Spark 1.6
【问题讨论】:
这能回答你的问题吗? How to access broadcasted DataFrame in Spark NO ,这两个问题是不同的。给定关于如何使用广播 DF 的链接 ID,我的问题是关于查询优化。由于广播小 df 是广泛宣传的避免洗牌的方法。 【参考方案1】:由于在加入期间未使用广播,您可能会收到此消息。
尝试使用以下内容:
val totalBookSold = df.join(broadcast(dfBook) , "book")
totalBookSold.explain(true)
您必须看到逻辑计划中传递的提示,并且会看到物理计划中的差异。
与此类似:
== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(book))
:- Project [_1#3 AS author#6, _2#4 AS book#7]
: +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
: +- ExternalRDD [obj#2]
+- ResolvedHint (broadcast)
+- Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#13, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#14]
+- ExternalRDD [obj#12]
== Analyzed Logical Plan ==
book: string, author: string, sold_copy: int
Project [book#7, author#6, sold_copy#17]
+- Join Inner, (book#7 = book#16)
:- Project [_1#3 AS author#6, _2#4 AS book#7]
: +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
: +- ExternalRDD [obj#2]
+- ResolvedHint (broadcast)
+- Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#13, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#14]
+- ExternalRDD [obj#12]
== Optimized Logical Plan ==
Project [book#7, author#6, sold_copy#17]
+- Join Inner, (book#7 = book#16)
:- Project [_1#3 AS author#6, _2#4 AS book#7]
: +- Filter isnotnull(_2#4)
: +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
: +- ExternalRDD [obj#2]
+- ResolvedHint (broadcast)
+- Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
+- Filter isnotnull(_1#13)
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#13, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#14]
+- ExternalRDD [obj#12]
== Physical Plan ==
*(2) Project [book#7, author#6, sold_copy#17]
+- *(2) BroadcastHashJoin [book#7], [book#16], Inner, BuildRight
:- *(2) Project [_1#3 AS author#6, _2#4 AS book#7]
: +- *(2) Filter isnotnull(_2#4)
: +- *(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
: +- Scan ExternalRDDScan[obj#2]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *(1) Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
+- *(1) Filter isnotnull(_1#13)
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#13, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#14]
+- Scan ExternalRDDScan[obj#12]
【讨论】:
以上是关于广播数据帧没有删除洗牌阶段(几行数据)[重复]的主要内容,如果未能解决你的问题,请参考以下文章