Spark SQL:为啥一个查询有两个作业?

Posted

技术标签:

【中文标题】Spark SQL:为啥一个查询有两个作业?【英文标题】:Spark SQL: Why two jobs for one query?Spark SQL:为什么一个查询有两个作业? 【发布时间】:2016-10-11 21:03:39 【问题描述】:

实验

我在Spark 1.6.1 上尝试了以下 sn-p。

val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files
soDF.registerTempTable("so")
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/")

Physical Plan 是:

== Physical Plan ==
Sort [cnt#59L ASC], true, 0
+- ConvertToUnsafe
   +- Exchange rangepartitioning(cnt#59L ASC,200), None
      +- ConvertToSafe
         +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L])
            +- TungstenExchange hashpartitioning(dpHour#38,200), None
               +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L])
                  +- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder

对于这个查询,我得到了两个 Job:Job 9Job 10

对于Job 9DAG 是:

对于Job 10DAG 是:

观察

    显然,一个查询有两个jobsStage-16(在Job 9 中标记为Stage-14)在Job 10 中被跳过。 Stage-15 的最后一个RDD[48],与Stage-17 的最后一个RDD[49] 相同。 如何?我在日志中看到Stage-15执行后,RDD[48]注册为RDD[49] Stage-17 显示在 driver-logs 中,但从未在 Executors 处执行。在driver-logs 上显示了任务执行,但是当我查看Yarn 容器的日志时,没有证据表明从Stage-17 收到任何task

支持这些观察的日志(仅driver-logs,由于后来的崩溃,我丢失了executor 日志)。可以看到Stage-17启动之前,RDD[49]就被注册了:

16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200)
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200)
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool 
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB)
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB)
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB)
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26)
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes)

问题

    为什么是两个Jobs?将DAG 分成两个jobs 的目的是什么? Job 10DAG 看起来 完成 查询执行。 Job 9 有什么具体的工作吗? 为什么Stage-17 没有被跳过?看起来像虚拟的tasks 被创建了,他们有什么目的吗?

    后来,我尝试了另一个更简单的查询。出乎意料的是,它正在创建 3 个Jobs

    sqlContext.sql("select dpHour from so order by dphour").write.parquet("/out2/")

【问题讨论】:

我的观察是 rdd 的东西更容易理解,并且大多数文档都是基于此的。 DF 的东西确实很难与 Job、App、东西等的最初讨论联系起来。 【参考方案1】:

当您使用高级数据帧/数据集 API 时,由 Spark 决定执行计划,包括作业/阶段分块。这些取决于许多因素,例如执行并行性、缓存/持久化数据结构等。在 Spark 的未来版本中,随着优化器复杂程度的提高,您可能会看到每个查询的作业更多,例如,对某些数据源进行采样以参数化基于成本的执行优化。

例如,我经常(但并非总是)看到写作从涉及随机播放的处理中生成单独的作业。

归根结底,如果您使用的是高级 API,除非您必须对大量数据进行极其详细的优化,否则很少需要深入研究特定的分块。与处理/输出相比,作业启动成本极低。

另一方面,如果您对 Spark 内部结构感到好奇,请阅读优化器代码并加入 Spark 开发人员邮件列表。

【讨论】:

这很好奇,为什么第二个工作阶段不能在第一个工作? 好问题。它可能与中间结果生成有关。重要的问题是:为什么将 DAG 映射到阶段和作业很重要? 是的,很难真正理解 Spark 是如何做到这一点的,混合了可用资源、数据...... 我的观察是 rdd 的东西更容易理解,并且大多数文档都是基于此的。 DF 的东西确实更难与 Job、App、东西等的最初讨论联系起来 @thebluephantom RDD 计划更容易遵循,因为没有优化:你写什么,Spark 就做什么。缺点很明显:没有优化,没有高级 SQL(-like) 操作,更大的序列化/反序列化开销等。这就是为什么在大多数情况下,Spark 使用数据集比 RDD 执行得更快,即使数据集在幕后使用 RDD。

以上是关于Spark SQL:为啥一个查询有两个作业?的主要内容,如果未能解决你的问题,请参考以下文章

为啥只有一个 spark 作业只使用一个执行器运行?

在Spark SQL作业中使用地理空间函数

为啥 SQL 子查询中的外部引用会产生不同的结果?

通过 RESTful API 查询 SPARK 作业产生的数据

Spark SQL 用于从两个不同的查询中划分计数并将输出存储为 Double

双十一剑指Spark两个生产场景