为啥 apache spark 中的这两个阶段计算相同的东西?

Posted

技术标签:

【中文标题】为啥 apache spark 中的这两个阶段计算相同的东西?【英文标题】:Why those two stages in apache spark are computing same thing?为什么 apache spark 中的这两个阶段计算相同的东西? 【发布时间】:2016-10-22 12:42:48 【问题描述】:

我是 spark 新手,我有两个长时间运行的阶段,它们几乎做同样的事情。下面是我的伪代码。

var metaData = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true") 
  .option("inferSchema", "true") 
  .load(csvFile)

val met = broadcast(metaData.dropDuplicates(Seq("col1")))


val accessLogs = sc.textFile(logFile).filter(line => regex.pattern.matcher(line).matches).map(line => LogParser.parseLogLine(line)).toDF()




val joinOutput = accessLogs.join(met,accessLogs("col1") === met("col1"),"left_outer")

val uniqueDfNames2 = Seq("col0", "col1", "col2", "col3","col4")
val sparseFilter = joinOutput
                    .filter(joinOutput.col("col1").isNotNull)
                    .filter(joinOutput.col("col2").isNotNull)
                    .flatMap(row=>ListParser.parseLogLine(row))
sparseFilter.cache()

val uniqueCount = sparseFilter
                    .filterr=>r.col0 != null && r.col0 != "" 
                    .map
                          case(KeyValParse(col0,col1,col2,col3,col4,col5))=>((col0,col1,col2,col3,col4,col5),1)
                        
                    .distinct().cache()
                    .map case ((col0,col1,col2,col3,col4),count) => ((col0,col1,col2,col3,col4),1)
                    
                    .reduceByKey(_+_)
                    .map case ((col0,col1,col2,col3,col4),count) => (col0,col1,col2,col3,col4,count)
                    
                    .toDF(uniqueDfNames: _*).cache()

val totalCount = sparseFilter
                  .map
                        case(Parse(col0,col1,col2,col3,col4,col5))=>((col0,col1,col2,col3,col4),1)
                      
                  .reduceByKey(_+_)
                  .map
                        case ((col0,col1,col2,col3,col4),totcount) => (col0,col1,col2,col3,col4,totcount)
                      
                  .toDF(uniqueDfNames2: _*)
                  .join(uniqueCount,Seq("col0", "col1", "col2", "col3"),"left")
                  .select($"col0",$"col1",$"col2",$"col3",$"unicount",$"totcount")
                  .orderBy($"unicount".desc)
                  .toDF(totalDfNames: _*)

totalCount
  .select("*")
  .write
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .option("delimiter", "|")
  .save(countPath)

我在这里尝试做的是根据一些参数从日志中生成唯一计数和总数。

一切正常,但是这两个长期运行的阶段共享几乎相同的 DAG。

以下是两个阶段的镜头。

请查看下面给出的两个阶段的屏幕截图。

直到平面图任务,他们都做同样的事情。 为什么这些不合并为一个阶段? 为什么第 11 阶段要重新读取文件并再次进行所有计算是我无法猜测的?

对于具有 10 个执行程序(7 核,15Gb RAM)的 20Gb 数据,它需要将近 30 分钟才能完成,但我觉得这可以减少到相当短的时间。

任何指导将不胜感激。

PS:- 对不起我的图片编辑技巧:)

【问题讨论】:

【参考方案1】:

RDD 在第一次在操作中计算时被缓存。代码中的第一个操作是“distinct”,即“sparseFilter”RDD 被缓存时。所以第一次缓存操作可能对后续阶段没有用。第一阶段的输出是一个不同的 RDD,但稍后您指的是 sparseFilter。所以 Spark 不得不重新计算 RDD。

我认为逻辑可以稍有不同。如果我理解正确,对于 totalCount 和 uniqueCount,代码使用相同的列集(col0、col1、col2、col3、col4)。那么在totalCount计算中,在reduceByKey之后,简单的count应该给出uniqueCount吗?通过这种方式可以避免额外的 distinct、reduceByKey、join 等。

【讨论】:

对不起,我在代码中犯了一个错误,uniqueCount RDD 实际上第一次使用 col0-col5 并采用不同的然后采用 col0-col4 来计算计数(将 col5 视为一些一种 userId,我想为唯一用户生成计数)

以上是关于为啥 apache spark 中的这两个阶段计算相同的东西?的主要内容,如果未能解决你的问题,请参考以下文章

使用多列作为存储在 Apache Spark 中的数组中的键来连接两个 Dataframe

为啥 Apache-Spark - Python 在本地比 pandas 慢?

写入 HDFS 时 Apache spark 中的任务数

Apache Spark 中的 DataFrame 相等性

如何成为Apache Spark开发人员?

为啥 Spark 将 Map 阶段输出保存到本地磁盘?