广播数据帧没有删除洗牌阶段(几行数据)[重复]

Posted

技术标签:

【中文标题】广播数据帧没有删除洗牌阶段(几行数据)[重复]【英文标题】:Broadcast dataframe is not removing shuffle phase (few rows of data) [duplicate] 【发布时间】:2020-01-29 03:37:03 【问题描述】:

我正在使用 databricks 社区版笔记本学习 spark。我创建了几行的示例数据。 由于数据非常小,因此在查询计划中不应该有交换阶段。我也尝试过广播,但我仍然看到交换阶段。这些配置在 DB 社区版笔记本上不起作用吗?

    import org.apache.spark.sql.functions.col,regexp_replace

val authorBook = sc.parallelize(Seq(("a1" , "b1") , ("a1","b2") , ("a2" , "b3"),("a3" , "b4")))
val schemaColumn = Seq("author","book")
val df = authorBook.toDF(schemaColumn:_*)

val bookSold = sc.parallelize(Seq(("b1",100) , ("b2" , 500) , ("b3" , 400) , ("b4" , 500)) )
val bookSchema = Seq("book" , "sold_copy")
val dfBook = bookSold.toDF(bookSchema:_*)

///val totalBookSold = df.repartition(4,col("book")).join(dfBook.repartition(4,col("book")) , "book")

sc.broadcast(dfBook)
val totalBookSold = df.join(dfBook , "book")
totalBookSold.explain(true)

有广播和无广​​播的查询计划相同

    == Physical Plan ==
*(3) Project [book#698, author#697, sold_copy#708]
+- *(3) SortMergeJoin [book#698], [book#707], Inner
   :- Sort [book#698 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(book#698, 200), [id=#2071]
   :     +- *(1) Project [_1#694 AS author#697, _2#695 AS book#698]
   :        +- *(1) Filter isnotnull(_2#695)
   :           +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#694, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#695]
   :              +- Scan[obj#693]
   +- Sort [book#707 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(book#707, 200), [id=#2079]
         +- *(2) Project [_1#704 AS book#707, _2#705 AS sold_copy#708]
            +- *(2) Filter isnotnull(_1#704)
               +- *(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#704, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#705]
                  +- Scan[obj#703]

这个链接解决了我的问题

Broadcast not happening while joining dataframes in Spark 1.6

【问题讨论】:

这能回答你的问题吗? How to access broadcasted DataFrame in Spark NO ,这两个问题是不同的。给定关于如何使用广播 DF 的链接 ID,我的问题是关于查询优化。由于广播小 df 是广泛宣传的避免洗牌的方法。 【参考方案1】:

由于在加入期间未使用广播,您可能会收到此消息。

尝试使用以下内容:

val totalBookSold = df.join(broadcast(dfBook) , "book")
totalBookSold.explain(true)

您必须看到逻辑计划中传递的提示,并且会看到物理计划中的差异。

与此类似:

== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(book))
:- Project [_1#3 AS author#6, _2#4 AS book#7]
:  +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
:     +- ExternalRDD [obj#2]
+- ResolvedHint (broadcast)
   +- Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#13, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#14]
         +- ExternalRDD [obj#12]

== Analyzed Logical Plan ==
book: string, author: string, sold_copy: int
Project [book#7, author#6, sold_copy#17]
+- Join Inner, (book#7 = book#16)
   :- Project [_1#3 AS author#6, _2#4 AS book#7]
   :  +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
   :     +- ExternalRDD [obj#2]
   +- ResolvedHint (broadcast)
      +- Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
         +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#13, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#14]
            +- ExternalRDD [obj#12]

== Optimized Logical Plan ==
Project [book#7, author#6, sold_copy#17]
+- Join Inner, (book#7 = book#16)
   :- Project [_1#3 AS author#6, _2#4 AS book#7]
   :  +- Filter isnotnull(_2#4)
   :     +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
   :        +- ExternalRDD [obj#2]
   +- ResolvedHint (broadcast)
      +- Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
         +- Filter isnotnull(_1#13)
            +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#13, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#14]
               +- ExternalRDD [obj#12]

== Physical Plan ==
*(2) Project [book#7, author#6, sold_copy#17]
+- *(2) BroadcastHashJoin [book#7], [book#16], Inner, BuildRight
   :- *(2) Project [_1#3 AS author#6, _2#4 AS book#7]
   :  +- *(2) Filter isnotnull(_2#4)
   :     +- *(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
   :        +- Scan ExternalRDDScan[obj#2]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *(1) Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
         +- *(1) Filter isnotnull(_1#13)
            +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#13, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#14]
               +- Scan ExternalRDDScan[obj#12]

【讨论】:

以上是关于广播数据帧没有删除洗牌阶段(几行数据)[重复]的主要内容,如果未能解决你的问题,请参考以下文章

计算机网络项目——最小网元设计(阶段三)

计算机网络项目——最小网元设计(阶段三)

计算机网络项目——最小网元设计(阶段三)

计算机网络项目——最小网元设计(阶段三)

使用 lambda 删除几行 pandas 数据帧

Spark Sort Merge Join 是不是涉及洗牌阶段?