Spark中对大表子查询加limit为什么会报Broadcast超时错误
Posted 格格巫 MMQ!!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中对大表子查询加limit为什么会报Broadcast超时错误相关的知识,希望对你有一定的参考价值。
当两个表需要join时,如果一个是大表,一个是小表,正常的map-reduce流程需要shuffle,这会导致大表数据在节点间网络传输,常见的优化方式是将小表读到内存中并广播到大表处理,避免shuffle+reduce;
在hive中叫mapjoin(map-side join),配置为 hive.auto.convert.join
在spark中叫BroadcastHashJoin (broadcast hash join)
Spark SQL uses broadcast join (aka broadcast hash join) instead of hash join to optimize join queries when the size of one side data is below spark.sql.autoBroadcastJoinThreshold.
Broadcast join can be very efficient for joins between a large table (fact) with relatively small tables (dimensions) that could then be used to perform a star-schema join. It can avoid sending all data of the large table over the network.
有几种方式可以触发:
1)sql hint (从spark 2.3版本开始支持)
SELECT /*+ MAPJOIN(b) */ …
SELECT /*+ BROADCASTJOIN(b) */ …
SELECT /*+ BROADCAST(b) */ …
2)broadcast function:DataFrame.broadcast
testTable3= testTable1.join(broadcast(testTable2), Seq(“id”), “right_outer”)
3)自动优化
org.apache.spark.sql.execution.SparkStrategies.JoinSelection
复制代码
private def canBroadcast(plan: LogicalPlan): Boolean =
plan.statistics.isBroadcastable ||
(plan.statistics.sizeInBytes >= 0 &&
plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)
复制代码
例如:
spark-sql> explain select * from big_table1 a, (select * from big_table2 limit 10) b where a.id = b.id;
18/09/17 18:14:09 339 WARN Utils66: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting ‘spark.debug.maxToStringFields’ in SparkEnv.conf.
== Physical Plan ==
BroadcastHashJoin [id#5], [id#14], Inner, BuildRight
-
:- *Filter isnotnull(id#5)
- ± HiveTableScan [name#4, id#5], MetastoreRelation big_table1
± BroadcastExchange HashedRelationBroadcastMode(List(input[6, string, false]))
± Filter isnotnull(id#14)
+- GlobalLimit 10
+- Exchange SinglePartition
+- LocalLimit 10
+- HiveTableScan [id#14, ... 187 more fields], MetastoreRelation big_table2
Time taken: 4.216 seconds, Fetched 1 row(s)
BroadcastExchange 执行过程为
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
复制代码
override protected[sql] def doExecuteBroadcastT: broadcast.Broadcast[T] =
ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout)
.asInstanceOf[broadcast.Broadcast[T]]
复制代码
其中timeout是指spark.sql.broadcastTimeout,默认300s
复制代码
private lazy val relationFuture: Future[broadcast.Broadcast[Any]] =
// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId)
try
val beforeCollect = System.nanoTime()
// Note that we use .executeCollect() because we don't want to convert data to Scala types
val input: Array[InternalRow] = child.executeCollect()
if (input.length >= 512000000)
throw new SparkException(
s"Cannot broadcast the table with more than 512 millions rows: $input.length rows")
val beforeBuild = System.nanoTime()
longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
longMetric("dataSize") += dataSize
if (dataSize >= (8L << 30))
throw new SparkException(
s"Cannot broadcast the table that is larger than 8GB: $dataSize >> 30 GB")
// Construct and broadcast the relation.
val relation = mode.transform(input)
val beforeBroadcast = System.nanoTime()
longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
val broadcasted = sparkContext.broadcast(relation)
longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
broadcasted
复制代码
对一个表broadcast执行过程为首先计算然后collect,然后通过SparkContext broadcast出去,并且执行过程为线程异步执行,超时时间为spark.sql.broadcastTimeout;
我有一个小的DF,这是相当昂贵的计算。然后我广播加入小DF和一个大得多的DF。
代码看起来像这样:
val laregDf = loadLargeDfFromHive()
// Cached for later re-use
val smallDf = expensiveComputation().cache()
val joined = largeDf.as("a").join(broadcast(smallDf.as("b")), $"a.key" === $"b.key", "inner")
现在,在集群上,我偶尔会超过300s的广播加入超时阈值。我可以增加这个阈值,但是选择一个值是相当随意的,如果集群繁忙,我仍然可能超过超时。
以上是关于Spark中对大表子查询加limit为什么会报Broadcast超时错误的主要内容,如果未能解决你的问题,请参考以下文章
SparkAPI中的spark.sql(sql)支持limit查询吗?例如select * from tablename limit 1,10。