写入 HDFS 时 Apache spark 中的任务数
Posted
技术标签:
【中文标题】写入 HDFS 时 Apache spark 中的任务数【英文标题】:Number of Task in Apache spark while writing into HDFS 【发布时间】:2021-07-21 07:04:11 【问题描述】:我正在尝试读取 csv 文件,然后添加一些列。之后尝试以兽人格式保存。
我无法理解 spark 如何决定不同阶段的任务数量。
为什么 CSV 阶段的任务数是 1,而 ORC 阶段的任务数是 39?
val c1c8 = spark.read.option("header",true).csv("/user/DEEPAK_TEST/C1C6_NEW/")
val c1c8new = c1c8.withColumnRenamed("c1c6_F","c1c8").withColumnRenamed("Network_Out","c1c8_network").withColumnRenamed("Access NE Out","c1c8_access_ne")
.withColumn("c1c8_signalling",when (col("signalling_Out") === "SIP Cl4" , "SIP CL4").when (col("signalling_Out") === "SIP cl4" , "SIP CL4").when (col("signalling_Out") === "Other" , "other").otherwise(col("signalling_Out")))
.withColumnRenamed("access type Out","c1c8_access_type").withColumnRenamed("Type_of_traffic_C","c1c8_typeoftraffic")
.withColumnRenamed("BOS traffic type Out","c1c8_bos_trafc_typ").withColumnRenamed("Scope_Out","c1c8_scope")
.withColumnRenamed("Join with UP-DWN SIP cl5 T1T7 Out","c1c8_join_indicator")
.select("c1c8","c1c8_network", "c1c8_access_ne", "c1c8_signalling", "c1c8_access_type", "c1c8_typeoftraffic",
"c1c8_bos_trafc_typ", "c1c8_scope","c1c8_join_indicator")
c1c8new.write.orc("/user/DEEPAK_TEST/C1C8_MAPPING_NEWT/")
【问题讨论】:
【参考方案1】:以下是我查看 Spark 2.x 源代码的理解。
Stage 0 是一个文件扫描,它创建 FileScanRDD,它是一个扫描文件分区列表的 RDD。当您从多个分区目录(例如分区 Hive 表)读取数据时,此阶段可能有多个任务。
Stage 1 中的任务数将等于 RDD 分区数。在您的情况下,c1c8new.rdd.getNumPartitions
将是 39。此数字使用以下公式计算:
spark.files.maxPartitionBytes
(默认128MB)
任务调度器返回的sparkContext.defaultParallelism
(在本地模式下等于内核数)
totalBytes
DataSourceScanExec.scala#L423
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
如果您将日志级别设置为 INFO - spark.sparkContext.setLogLevel("INFO")
,您可以在上述日志消息中看到实际计算值
在您的情况下,我认为拆分大小为 128,因此,任务/分区的数量大约为 4.6G/128MB
附带说明,您可以通过在数据帧上使用 repartition() 或 coalesce() 来更改分区数(以及后续阶段中的任务数)。更重要的是,shuffle 后的分区数由 spark.sql.shuffle.partitions 决定(默认为 200)。如果你有 shuffle,最好使用这个配置来控制任务的数量,因为在 stage 之间插入 repartition() 或 coalesce() 会增加额外的开销。
对于大型 Spark SQL 工作负载,在每个阶段为 spark.sql.shuffle.partitions 设置最佳值始终是一个痛点。如果启用了自适应查询执行,Spark 3.x 对此提供了更好的支持,但我还没有尝试将它用于任何生产工作负载。
【讨论】:
以上是关于写入 HDFS 时 Apache spark 中的任务数的主要内容,如果未能解决你的问题,请参考以下文章
在本地文件系统(不是HDFS)中使用Scala读取Apache Spark中的文件时如何更改输入块大小[重复]
从 Java 中的 spark 转换函数写入 HDFS 中的文件
无法在 Spark 中将文件写入 Zeppelin 上的远程 hdfs
名称中的日期时 Spark CSV 文件写入错误 - InvalidClassException:org.apache.commons.lang3.time.FastDateParser