通过可能增加分区或任务的数量来提高 Spark DataFrame 到 RDD 转换的速度

Posted

技术标签:

【中文标题】通过可能增加分区或任务的数量来提高 Spark DataFrame 到 RDD 转换的速度【英文标题】:Increasing the speed for Spark DataFrame to RDD conversion by possibly increasing the number of partitions or tasks 【发布时间】:2017-05-29 17:18:39 【问题描述】:

我在尝试将 DF 转换为 RDD 时遇到了问题。这个过程中的一个阶段总共使用了 200 个任务,而在此之前的大多数部分使用了更多,我无法理解为什么它使用这个数字以及我是否需要找到一种方法来增加这个数字以提高性能。

该程序使用 Spark 版本 2.1.0,并在我使用 250 个执行器的 Yarn 集群上运行。

这些是 DF 转换为 RDD 的行:

val predictionRdd = selectedPredictions
    .withColumn("probabilityOldVector", convertToOldVectorUdf($"probability"))
    .select("mid", "probabilityOldVector")
    .rdd

这会产生前面提到的 200 个任务,如以下屏幕截图中的活动阶段所示。

基本上就是卡在这里不知道多久了,另外两个完成的阶段用的任务明显多了。

我尝试过的一件事是在将其转换为 RDD 之前执行重新分区:

val predictionRdd = selectedPredictions
    .withColumn("probabilityOldVector", convertToOldVectorUdf($"probability"))
    .select("mid", "probabilityOldVector")
    .repartition(2000)
    .rdd

val avgPredictions = predictionRdd
    .map(row => (row.getAs[String]("mid"), row.getAs[OldVector]("probabilityOldVector")))
    .aggregateByKey(new MultivariateOnlineSummarizer)(
        (agg, v) => agg.add(v),
        (agg1, agg2) => agg1.merge(agg2)
    )
    .map(p => (p._1, p._2.mean))

假设理想情况下,这将导致执行 2000 个任务。然而,这有一个(对我来说)意想不到的结果。 This 图像(与之前相同)显示了该部分所属的整个作业。有趣的是,它仍然显示了 200 个任务,并且在将其转换为 RDD 之前重新分区的 2000 个分区在用于挂起映射阶段的任务数量中可见。

在我看来,要提高这部分的速度,我需要增加正在执行的任务的数量,使其能够更多地并行运行,而每个任务使用的内存更少。

所以我的问题基本上是:

    我是否至少在某种程度上正确理解了这种情况,还是问题完全出在其他地方, 增加正在执行的任务数量是否也会提高速度, 如何增加此部分的任务(或分区)数量,或者我可以通过哪些其他方式提高速度?

我对 Spark 还是有点陌生​​,我在某种程度上知道如何在更高的层次上解决它,但实际的复杂性仍然让我难以理解。

在撰写本文时,我注意到它在大约 1.3 小时后终于显示出一些进展,这看起来很简单。

以下是按执行者和任务汇总的小部分指标:

Executor ID  Address  Task Time  Total Tasks  Failed Tasks  Killed Tasks  Succeeded Tasks  Shuffle Read Size / Records  Shuffle Write Size / Records  Shuffle Spill (Memory)  Shuffle Spill (Disk)
1            -        1.4 h      1            0             0             1                1810.3 MB / 8527038          2.1 GB / 2745175              5.9 GB                  1456.3 MB 
10           -        0 ms       0            0             0             0                1808.2 MB / 8515093          0.0 B / 1839668               5.9 GB                  1456.7 MB 

Index  ID     Attempt  Status   Locality Level  Executor ID / Host                 Launch Time          Duration  Scheduler Delay  Task Deserialization Time  GC Time  Result Serialization Time  Getting Result Time  Peak Execution Memory  Shuffle Read Size / Records  Write Time  Shuffle Write Size / Records  Shuffle Spill (Memory)  Shuffle Spill (Disk)  Errors
0      19454  0        RUNNING  PROCESS_LOCAL   197 / worker176.hathi.surfsara.nl  2017/05/29 17:23:37  1.5 h     0 ms             0 ms                       3.8 min  0 ms                       0 ms                 3.1 GB                 1809.9 MB / 8525371                      0.0 B / 1839667               5.9 GB                  1456.0 MB       
1      19455  0        SUCCESS  PROCESS_LOCAL   85 / worker134.hathi.surfsara.nl   2017/05/29 17:23:37  1.5 h     42 ms            8 s                        3.2 min  0 ms                       0 ms                 6.0 GB                 1808.3 MB / 8519686          5 s         2.1 GB / 2742924              5.9 GB                  1456.3 MB       

这里有几个其他的上下文截图,作为链接添加,以免这篇文章太长:

DAG Stage 45 DAG Stage 46 Entire job overview

Stage 45 和 46 在 47 之前同时运行。源代码的主要部分可以在 GitHub 上的thissn-p 中查看。它加载了一个先前训练过的CrossValidatorModel,由一个包含五个步骤的管道组成:CharNGramCountVectorizerIDFRandomForestClassifierIndexToString。它为大约 5.5 亿个最大长度为 550 个字符的文本 sn-ps 预测 200 个类别的概率。然后将这些预测按输入类分组在一起,然后取平均值。

【问题讨论】:

200 是 Spark SQL 中用于 shuffle 的分区数的默认值——参见spark.sql.shuffle.partitions。能附上截图看看执行计划吗?我敢肯定你会有Exchange 在某个地方,这是一个随机步骤。 我没想到这样的选项会在配置中硬编码。感谢您的提醒。改变这一点确实会改变任务的数量。我还添加了 DAG 的图像,它确实有多个 Exchange 部分。不幸的是,在我的情况下,更改分区数量似乎对速度没有直接影响,所以看起来问题出在其他地方。 你能展示一下 Stage 46 吗?我关心一个阶段的三个交流。什么你洗牌这么多? 您能否使用类型化的数据集 API 删除(as[T] 之后的所有运算符)。鉴于issues.apache.org/jira/browse/SPARK-14083,他们不会得到很多优化 .filter($"snippet".isNotNull) 替换为 personAnnotationSnippetDs .na.drop(在第 131-133 行)。为什么你在第 147 行 .rdd 并在第 152 行使用 Row?这是用于 MultivariateOnlineSummarizer 的吗?你需要152行吗?你为什么在第 185 行.coalesce(128) 【参考方案1】:

您可以使用以下方法设置任务数:

      val spConfig = (new SparkConf).setMaster("local[*]").setAppName("MoviesRec")
  // Spark UI available at port 4040.. check here also
  spark = SparkSession.builder().appName("Movies").config(spConfig)
    .config("spark.ui.enabled", true)
      .config("spark.sql.shuffle.partitions", "100")

在日志中你会得到这样的东西:[Stage 9:======================================================> (98 + 2) / 100] 200 是数据帧的默认值。 您还可以在 SparkUI 上的 localhost:4040 上检查它,检查分区数,每个分区的大小,以及您拥有的总 RAM.. 在我的桌面上,如果我将默认分区数从 200 个减少到 8 个(我拥有的内核数),它会变得非常快。但是分区 = nr 个内核可能过于简单,因为一个分区代表一个数据块(默认为 64MB)。所以一个人应该往上走,直到至少整个内存都被使用了。

【讨论】:

以上是关于通过可能增加分区或任务的数量来提高 Spark DataFrame 到 RDD 转换的速度的主要内容,如果未能解决你的问题,请参考以下文章

Spark中repartition和coalesce的用法

Spark:如何指定持有 RDD 的 executor 数量?

spark数据分区数量的原理

Spark---并行度和分区

任务中如何确定spark分区数task数目core个数worker节点个数excutor数量

Spark-sql读取hive分区表限制分区过滤条件及限制分区数量