UNION是否在并行执行的不同表中有两个SELECT?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了UNION是否在并行执行的不同表中有两个SELECT?相关的知识,希望对你有一定的参考价值。

如果我有SELECT [...] UNION ALL SELECT [...]形式的Spark SQL语句,两个SELECT语句是否会并行执行?在我的特定用例中,两个SELECTs正在查询两个不同的数据库表。与我的预期相反,Spark UI似乎表明两个SELECT语句是按顺序执行的。

==更新1 ==

以下是Spark UI中显示的物理计划:

== Physical Plan ==
*Sort [avg_tip_pct#655 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(avg_tip_pct#655 DESC NULLS LAST, 4)
   +- *HashAggregate(keys=[neighborhood#163], functions=[avg(tip_pct#654)], output=[neighborhood#163, avg_tip_pct#655])
      +- Exchange hashpartitioning(neighborhood#163, 4)
         +- *HashAggregate(keys=[neighborhood#163], functions=[partial_avg(tip_pct#654)], output=[neighborhood#163, sum#693, count#694L])
            +- *Project [neighborhood#163, (tip_amount#513 / total_amount#514) AS tip_pct#654]
               +- InMemoryTableScan [neighborhood#163, tip_amount#513, total_amount#514]
                     +- InMemoryRelation [pickup_latitude#511, pickup_longitude#512, tip_amount#513, total_amount#514, neighborhood#163, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                           +- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, neighborhood#163, index#165]
                              +- *Project [pickup_latitude#301, index#165, pickup_longitude#300, neighborhood#163, total_amount#313, point#524, polygon#164, tip_amount#310]
                                 +- *SortMergeJoin [curve#578], [curve#580], Inner, ((relation#581 = Within) || Within(point#524, polygon#164))
                                    :- *Sort [curve#578 ASC NULLS FIRST], false, 0
                                    :  +- Exchange hashpartitioning(curve#578, 4)
                                    :     +- Generate inline(indexer(point#524, 30)), true, false, [curve#578, relation#579]
                                    :        +- Union
                                    :           :- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, pointconverter(pickup_longitude#300, pickup_latitude#301) AS point#524]
                                    :           :  +- *Filter ((isnotnull(total_amount#313) && payment_type#306 IN (CREDIT,CRD,1)) && (total_amount#313 > 200.0))
                                    :           :     +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2014},org.apache.spark.sql.SQLContext@3bf2de09) [pickup_latitude#301,payment_type#306,pickup_longitude#300,total_amount#313,tip_amount#310] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point...
                                    :           +- *Project [pickup_latitude#436, pickup_longitude#435, tip_amount#445, total_amount#448, pointconverter(pickup_longitude#435, pickup_latitude#436) AS point#524]
                                    :              +- *Filter ((isnotnull(total_amount#448) && payment_type#441 IN (CREDIT,CRD,1)) && (total_amount#448 > 200.0))
                                    :                 +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2015},org.apache.spark.sql.SQLContext@3bf2de09) [payment_type#441,pickup_longitude#435,pickup_latitude#436,total_amount#448,tip_amount#445] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point...
                                    +- *Sort [curve#580 ASC NULLS FIRST], false, 0
                                       +- Exchange hashpartitioning(curve#580, 4)
                                          +- Generate inline(index#165), true, false, [curve#580, relation#581]
                                             +- InMemoryTableScan [neighborhood#163, polygon#164, index#165]
                                                   +- InMemoryRelation [neighborhood#163, polygon#164, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                                                         +- *Project [UDF:metadata_string(metadata#13, neighborhood) AS neighborhood#163, polygon#12, index#15]
                                                            +- InMemoryTableScan [metadata#13, polygon#12, index#15]
                                                                  +- InMemoryRelation [point#10, polyline#11, polygon#12, metadata#13, valid#14, index#15], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `neighborhoods`
                                                                        +- *Scan GeoJSONRelation(gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson,Map(type -> geojson, magellan.index -> true, magellan.index.precision -> 30, path -> gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson)) [point#10,polyline#11,polygon#12,metadata#13,valid#14,index#15] ReadSchema: struct<point:struct<type:int,xmin:double,ymin:double,xmax:double,ymax:double,x:double,y:double>,p...

注意两个SELECTs的联合以BigQueryTableRelation上的扫描形式出现。这些似乎是顺序执行的。

每个BigQuery选择都在一个单独的作业中执行(每个作业都有一个阶段) - 顺序执行。我运行一个5节点的YARN集群,每个集群有4个CPU和26GB的RAM。我想知道我是否有自定义BigQuery数据源的事实。我希望它不应该。在任何情况下,作为参考,数据源可以在这里找到:github.com/miraisolutions/spark-bigquery

==更新2 ==

在Spark日志中,我看到以下日志条目:

17/12/19 14:36:24 INFO SparkSqlParser: Parsing command: SELECT `pickup_latitude` AS `pickup_latitude`, `pickup_longitude` AS `pickup_longitude`, `tip_amount` AS `tip_amount`, `total_amount` AS `total_amount` FROM ((SELECT * FROM `trips2014`) UNION ALL (SELECT * FROM `trips2015`)) `ggcyamhubf` WHERE (`payment_type` IN ("CREDIT", "CRD", "1"))

Spark优化了此查询并将谓词推送到数据源(在本例中为BigQuery)。但是,相应的BigQuery作业似乎完全按顺序执行,即第二个作业仅在第一个作业完成后触发。

答案

TL; DR是(取决于CPU可用性)

作为旁注:如果您有疑问,您还可以在自己的线程上执行两个SELECT,然后执行union(这也取决于CPU的数量),但您肯定会有一个真正的并行执行。

让我们使用(非常基本的)以下查询:

val q = spark.range(1).union(spark.range(2))

explain不会从CPU的角度告诉你最终的执行情况,但至少可以告诉你是否正在使用整个阶段的代码生成以及查询树有多远。

scala> q.explain
== Physical Plan ==
Union
:- *Range (0, 1, step=1, splits=8)
+- *Range (0, 2, step=1, splits=8)

在这个例子中,两个Range物理运算符(负责两个独立的数据集)将获得“codegend”,因此它们的执行是流水线的。它们的执行时间是完成处理分区中所有行的时间(尽可能快地处理Java代码本身可以使用System.sleep或类似的“机制”)。

查询的RDD沿袭可以为您提供有关查询执行的更多信息。

scala> q.rdd.toDebugString
res4: String =
(16) MapPartitionsRDD[17] at rdd at <console>:26 []
 |   MapPartitionsRDD[16] at rdd at <console>:26 []
 |   UnionRDD[15] at rdd at <console>:26 []
 |   MapPartitionsRDD[11] at rdd at <console>:26 []
 |   MapPartitionsRDD[10] at rdd at <console>:26 []
 |   ParallelCollectionRDD[9] at rdd at <console>:26 []
 |   MapPartitionsRDD[14] at rdd at <console>:26 []
 |   MapPartitionsRDD[13] at rdd at <console>:26 []
 |   ParallelCollectionRDD[12] at rdd at <console>:26 []

除非我弄错了,因为中间没有任何阶段,你可以并行化很多 - 它只是一个有16个分区的阶段,它的完成速度和上一个任务一样快(从16个要安排的任务)。

这意味着在这种情况下订单确实很重要。


我还发现this JIRA issue关于UNION ALL看起来很相似,如果不完全像你的情况。

以上是关于UNION是否在并行执行的不同表中有两个SELECT?的主要内容,如果未能解决你的问题,请参考以下文章

MySQL UNION 的两个子查询是串行还是并行

如何从另一个 SQL 表中获取两个不同列的匹配数据:Inner Join 和/或 Union?

是否可以在 Postgres 中执行并行查询,使用 union all 查询分区表?

如何让多个不同的SQL语句一起执行?

SQL语句怎么同时往两个表中插入不同的记录

Mybatis union查询异常