如何在 Spark 中确定最佳 shuffle 分区数

Posted

技术标签:

【中文标题】如何在 Spark 中确定最佳 shuffle 分区数【英文标题】:How to identify the optimum number of shuffle partition in Spark 【发布时间】:2020-04-04 14:03:48 【问题描述】:

我正在 EMR 中运行 Spark 结构化流式传输作业(每天反弹)。执行几个小时后,我的应用程序出现 OOM 错误并被杀死。以下是我的配置和 Spark SQL 代码。 我是 Spark 新手,需要您的宝贵意见。

EMR 有 10 个实例,具有 16 个内核和 64GB 内存。

Spark-提交参数:

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

Job 正在以 30 秒的间隔从 Kafka 中以微批次的形式读取输入。每批读取的平均行数为 90k。

  spark.streaming.kafka.maxRatePerPartition: 4500
  spark.streaming.stopGracefullyOnShutdown: true
  spark.streaming.unpersist: true
  spark.streaming.kafka.consumer.cache.enabled: true
  spark.hadoop.fs.s3.maxRetries: 30 
  spark.sql.shuffle.partitions: 2001

Spark SQL 聚合代码:

dataset.groupBy(functions.col(NAME),functions.window(functions.column(TIMESTAMP_COLUMN),30))
            .agg(functions.concat_ws(SPLIT, functions.collect_list(DEPARTMENT)).as(DEPS))
            .select(NAME,DEPS)
            .map((row) -> 
              Map<String, Object> map = Maps.newHashMap();
              map.put(NAME, row.getString(0));
              map.put(DEPS, row.getString(1));
              return new KryoMapSerializationService().serialize(map);
            , Encoders.BINARY());

来自驱动程序的一些日志:

20/04/04 13:10:51 INFO TaskSetManager: Finished task 1911.0 in stage 1041.0 (TID 1052055) in 374 ms on <host> (executor 3) (1998/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1925.0 in stage 1041.0 (TID 1052056) in 411 ms on  <host> (executor 3) (1999/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1906.0 in stage 1041.0 (TID 1052054) in 776 ms on  <host> (executor 3) (2000/2001)
20/04/04 13:11:04 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
20/04/04 13:11:04 INFO DAGScheduler: Executor lost: 3 (epoch 522)
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3,  <host>, 38533, None)
20/04/04 13:11:04 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
20/04/04 13:11:04 INFO YarnAllocator: Completed container container_1582797414408_1814_01_000004 on host:  <host> (state: COMPLETE, exit status: 143)

顺便说一句,我在我的 forEachBatch 代码中使用了 collectasList

  List<Event> list = dataset.select("value")
        .selectExpr("deserialize(value) as rows")
        .select("rows.*")
        .selectExpr(NAME, DEPS)
        .as(Encoders.bean(Event.class))
        .collectAsList();

【问题讨论】:

谁能告诉我上述方法有什么问题? 首先,如果你有 17executors*5 个核心,请确保你的源 kafka 主题中有相同​​数量的 kafka 分区。其次,如果可以的话,使用 NAME 键写入源 kafka,这意味着 groupby 不会有 shuffle,因为每个键都会进入同一个分区。第三,collect_list 总是非常危险的操作,如果你有数据倾斜(这是因为一个键有最多结果),它很容易导致 OOM。编辑:请从驱动程序和失败的执行程序执行更多日志。 我必须考虑一下,但您的问题源于使用编码器。数据集是一个陷阱。如果您可以对其进行返工以使用数据框操作,那么您的状态会好得多。你能提供一些示例输入和输出吗? 【参考方案1】:

使用这些设置,您可能会遇到自己的问题。

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

您基本上是在此处创建额外的容器,以便在它们之间进行切换。相反,从 10 个执行程序、15 个内核、60g 内存开始。如果这行得通,那么您可以试玩一下以尝试优化性能。我通常尝试将容器分成两半(但自 spark 2.0 以来我也不需要这样做)。

让 Spark SQL 将默认值保持在 200。您将其分解得越多,Spark 计算洗牌次数的数学就越多。如果有的话,我会尝试使用与执行程序相同数量的并行度,所以在这种情况下只有 10 个。当 2.0 出来时,这就是调整 hive 查询的方式。 让工作变得复杂而难以拆分会将所有的负担都放在主人身上。

使用数据集和编码通常也不如直接使用 DataFrame 操作的性能好。我发现为数据帧操作考虑这一点后,性能有了很大提升。

【讨论】:

以上是关于如何在 Spark 中确定最佳 shuffle 分区数的主要内容,如果未能解决你的问题,请参考以下文章

Spark ShuffleExecutor是如何fetch shuffle的数据文件

spark shuffle 内幕彻底解密课程

spark性能调优 spark shuffle中JVM内存使用及配置内幕详情

大数据:Spark ShuffleExecutor是如何fetch shuffle的数据文件

Spark Dataframe Join shuffle

详细探究Spark的shuffle实现