写入 HDFS 时 Apache spark 中的任务数

Posted

技术标签:

【中文标题】写入 HDFS 时 Apache spark 中的任务数【英文标题】:Number of Task in Apache spark while writing into HDFS 【发布时间】:2021-07-21 07:04:11 【问题描述】:

我正在尝试读取 csv 文件,然后添加一些列。之后尝试以兽人格式保存。

我无法理解 spark 如何决定不同阶段的任务数量。

为什么 CSV 阶段的任务数是 1,而 ORC 阶段的任务数是 39?

val c1c8 = spark.read.option("header",true).csv("/user/DEEPAK_TEST/C1C6_NEW/") 
val c1c8new =  c1c8.withColumnRenamed("c1c6_F","c1c8").withColumnRenamed("Network_Out","c1c8_network").withColumnRenamed("Access NE Out","c1c8_access_ne")
.withColumn("c1c8_signalling",when (col("signalling_Out") === "SIP Cl4" , "SIP CL4").when (col("signalling_Out") === "SIP cl4" , "SIP CL4").when (col("signalling_Out") === "Other" , "other").otherwise(col("signalling_Out")))
.withColumnRenamed("access type Out","c1c8_access_type").withColumnRenamed("Type_of_traffic_C","c1c8_typeoftraffic")
.withColumnRenamed("BOS traffic type Out","c1c8_bos_trafc_typ").withColumnRenamed("Scope_Out","c1c8_scope")
.withColumnRenamed("Join with UP-DWN SIP cl5 T1T7 Out","c1c8_join_indicator")
.select("c1c8","c1c8_network", "c1c8_access_ne", "c1c8_signalling", "c1c8_access_type", "c1c8_typeoftraffic",
 "c1c8_bos_trafc_typ", "c1c8_scope","c1c8_join_indicator")
 
c1c8new.write.orc("/user/DEEPAK_TEST/C1C8_MAPPING_NEWT/")

【问题讨论】:

【参考方案1】:

以下是我查看 Spark 2.x 源代码的理解。

Stage 0 是一个文件扫描,它创建 FileScanRDD,它是一个扫描文件分区列表的 RDD。当您从多个分区目录(例如分区 Hive 表)读取数据时,此阶段可能有多个任务。

Stage 1 中的任务数将等于 RDD 分区数。在您的情况下,c1c8new.rdd.getNumPartitions 将是 39。此数字使用以下公式计算:

配置值spark.files.maxPartitionBytes(默认128MB) 任务调度器返回的sparkContext.defaultParallelism(在本地模式下等于内核数) totalBytes

DataSourceScanExec.scala#L423

val defaultMaxSplitBytes =
      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
    val bytesPerCore = totalBytes / defaultParallelism

    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
      s"open cost is considered as scanning $openCostInBytes bytes.")

如果您将日志级别设置为 INFO - spark.sparkContext.setLogLevel("INFO"),您可以在上述日志消息中看到实际计算值

在您的情况下,我认为拆分大小为 128,因此,任务/分区的数量大约为 4.6G/128MB

附带说明,您可以通过在数据帧上使用 repartition() 或 coalesce() 来更改分区数(以及后续阶段中的任务数)。更重要的是,shuffle 后的分区数由 spark.sql.shuffle.partitions 决定(默认为 200)。如果你有 shuffle,最好使用这个配置来控制任务的数量,因为在 stage 之间插入 repartition() 或 coalesce() 会增加额外的开销。

对于大型 Spark SQL 工作负载,在每个阶段为 spark.sql.shuffle.partitions 设置最佳值始终是一个痛点。如果启用了自适应查询执行,Spark 3.x 对此提供了更好的支持,但我还没有尝试将它用于任何生产工作负载。

【讨论】:

以上是关于写入 HDFS 时 Apache spark 中的任务数的主要内容,如果未能解决你的问题,请参考以下文章

在本地文件系统(不是HDFS)中使用Scala读取Apache Spark中的文件时如何更改输入块大小[重复]

从 Java 中的 spark 转换函数写入 HDFS 中的文件

从 Spark 估计要写入 HDFS 的部分文件数

无法在 Spark 中将文件写入 Zeppelin 上的远程 hdfs

名称中的日期时 Spark CSV 文件写入错误 - InvalidClassException:org.apache.commons.lang3.time.FastDateParser

从 Azure Synapse 中的 Apache Spark 将数据写入 SQL DW