except() 查询的 spark sql 执行需要很多时间

Posted

技术标签:

【中文标题】except() 查询的 spark sql 执行需要很多时间【英文标题】:spark sql execution of except() query takes lot time 【发布时间】:2018-08-19 07:36:39 【问题描述】:

我在 spark 应用程序中有以下代码

val log_file_df = loadInputFile() /*loads log file as Dataframe */
val split_df: DataFrame = splitLineByDelimiter(log_file_df) /* applies filter 
                                                    function to input file */
val bad_data_df: DataFrame = parseAndSaveBadData(split_df) /*filters split dataframe into bad data */

val good_data_df = split_df.except(bad_data_df) /* separates good data from bad data */

如果我在 split_df,bad_data_df 上运行诸如 show() 之类的任何操作,它的执行时间会更短(大约 1.5 分钟),并且我检查了物理计划输入日志文件是否只读一次

但如果我对好的数据执行任何操作,相对而言它会花费更多时间。(4 分钟)

val good_data_df = split_df.except(bad_data_df).show()

从物理计划输入日志文件读取两次。我尝试了以下选项

    split_df.cache() or split_df.createOrReplaceTempView("split_dfTable")
//   Init.getSparkSession.sqlContext.cacheTable("split_dfTable")
     val splitbc =  Init.getSparkSession.sparkContext.broadcast(split_df)

但是执行时间并没有改善,物理计划是一样的。 这是物理计划。我应该如何改进我的代码?我的 good_data_df 被进一步转换并与其他一些需要更多时间的数据框连接。

good_data_df.show(false)good_data_df.explain(true)

 +- Exchange hashpartitioning(hostname#16, date#17, path#18, status#19,
    content_size#20, 200)
 +- *HashAggregate(keys=[hostname#16, date#17, path#18, status#19, 
      content_size#20], functions=[], output=[hostname#16, date#17, path#18, 
       status#19, content_size#20])
   +- SortMergeJoin [coalesce(hostname#16, ), coalesce(date#17, ), 
                   coalesce(path#18, ), coalesce(status#19, ), 
    coalesce(content_size#20, )], 
[coalesce(hostname#49, ), coalesce(date#50, ), coalesce(path#51, ), 
coalesce(status#52, ), coalesce(content_size#53, )], LeftAnti, 
(((((hostname#16 <=> hostname#49) && (date#17 <=> date#50)) && (path#18 <=> 
path#51)) && (status#19 <=> status#52)) && (content_size#20 <=> 
content_size#53))
:- *Sort [coalesce(hostname#16, ) ASC NULLS FIRST, coalesce(date#17, ) ASC 
NULLS FIRST, coalesce(path#18, ) ASC NULLS FIRST, coalesce(status#19, ) ASC 
NULLS FIRST, coalesce(content_size#20, ) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(coalesce(hostname#16, ), coalesce(date#17, ), 
coalesce(path#18, ), coalesce(status#19, ), coalesce(content_size#20, ), 200)
:     +- *Project [regexp_extract(val#13, ^([^\s]+\s), 1) AS hostname#16, 
regexp_extract(val#13, ^.*(\d\d/\w3/\d4:\d2:\d2:\d2 -\d4), 1) AS 
date#17, regexp_extract(val#13, ^.*"\w+\s+([^\s]+)\s*[(HTTP)]*.*", 1) AS 
path#18, regexp_extract(val#13, ^.*"\s+([^\s]+), 1) AS status#19, 
regexp_extract(val#13, ^.*\s+(\d+)$, 1) AS content_size#20]
:        +- *FileScan csv [val#13] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:/C:/Users/M1047320/Desktop/access_log_Jul95], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<val:string>
+- *Sort [coalesce(hostname#49, ) ASC NULLS FIRST, coalesce(date#50, ) ASC 
NULLS FIRST, coalesce(path#51, ) ASC NULLS FIRST, coalesce(status#52, ) ASC 

NULLS FIRST, coalesce(content_size#53, ) ASC NULLS FIRST], false, 0
        +- Exchange hashpartitioning(coalesce(hostname#49, ), 
coalesce(date#50, ), coalesce(path#51, ), coalesce(status#52, ), 
coalesce(content_size#53, ), 200)
+- *Project [regexp_extract(val#13, ^([^\s]+\s), 1) AS hostname#49, 
regexp_extract(val#13, ^.*(\d\d/\w3/\d4:\d2:\d2:\d2 -\d4), 1) AS 
date#50, regexp_extract(val#13, ^.*"\w+\s+([^\s]+)\s*[(HTTP)]*.*", 1) AS 
path#51, regexp_extract(val#13, ^.*"\s+([^\s]+), 1) AS status#52, 
regexp_extract(val#13, ^.*\s+(\d+)$, 1) AS content_size#53]
              +- *Filter ((((regexp_extract(val#13, ^.*"\w+\s+([^\s]+)\s* 
[(HTTP)]*.*", 1) RLIKE .*(jpg|gif|png|xbm|jpeg|wav|mpg|pl)$ || 
(regexp_extract(val#13, ^([^\s]+\s), 1) = )) || (regexp_extract(val#13, 
^.*"\w+\s+([^\s]+)\s*[(HTTP)]*.*", 1) = )) || (regexp_extract(val#13, ^.* 
(\d\d/\w3/\d4:\d2:\d2:\d2 -\d4), 1) = )) || 
(regexp_extract(val#13, ^.*"\s+([^\s]+), 1) = ))
                 +- *FileScan csv [val#13] Batched: false, Format: CSV,
Location: 
InMemoryFileIndex[file:/C:/Users/M1047320/Desktop/access_log_Jul95], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<val:string>

【问题讨论】:

【参考方案1】:

显示 split_df,bad_data_df 所需时间显着减少的原因是 Spark 只会读取和解析节目所需的行。您读入 Spark 的数据被分解为 Partitions,它们是数据片段,将在工作人员之间进行划分。

在 split_df 上调用 show 后,bad_data_df Spark 将仅适用于一小部分数据(split_df 上只有 20 行,bad_data_df 上只有前 20 行坏行)。

另一方面,当在 good_data_df 上调用 show 时,Spark 将不得不处理 所有 数据(以读取所有数据、解析数据并从全部的)。

如果您有一种定义坏行的简单方法,我建议添加另一列 UDF - Boolean isBad,并对其进行过滤。简单地传递数据比except简单得多。

【讨论】:

感谢您的帮助。我知道火花转换是懒惰的。我的疑问是为什么即使在缓存之后,文件扫描也会在物理计划中出现两次【参考方案2】:

缓存不是一个动作。因此,如果您在 good_data_df.show() 之前执行 split_df.cache(),则只会为 good_data_df.show 而不是 split_df.cache 创建 DAG。 split_df 的缓存将作为一个阶段执行,但 good_data_df 将无法使用该缓存。要让 good_data_df 使用缓存数据,只需在 split_df.cache 之后使用 split_df.take(1),这实际上会使 split_df 缓存以供 good_data_df 使用。

【讨论】:

以上是关于except() 查询的 spark sql 执行需要很多时间的主要内容,如果未能解决你的问题,请参考以下文章

Spark数据集何时使用Except vs Left Anti Join

Spark-SQL 是不是支持使用 regex 规范的 Hive Select All Query with except Columns

Spark:通过对临时表执行 sql 查询来创建临时表

Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行

SQL-Union ALL 和 except

尝试从 UDF 执行 spark sql 查询