Spark:读取文本文件后的重新分区策略

Posted

技术标签:

【中文标题】Spark:读取文本文件后的重新分区策略【英文标题】:Spark: Repartition strategy after reading text file 【发布时间】:2015-03-23 12:11:55 【问题描述】:

我以这种方式启动了我的集群:

/usr/lib/spark/bin/spark-submit --class MyClass --master yarn-cluster--num-executors 3 --driver-memory 10g --executor-memory 10g --executor-cores 4 /path/to/jar.jar

我做的第一件事是阅读一个大文本文件,并计算它:

val file = sc.textFile("/path/to/file.txt.gz")
println(file.count())

执行此操作时,我看到只有一个节点实际上正在读取文件并执行计数(因为我只看到一个任务)。这是预期的吗?我应该在之后重新分区我的 RDD,或者当我使用 map reduce 函数时,Spark 会为我做吗?

【问题讨论】:

你的“defaultMinPartitions”是什么?正如文档明确指出的那样, textFile 采用可选的分区数参数,默认为该参数。 我的 defaultMinPartitions 大于一。看来我不能强制指定数量的分区,因为它只是一个文本文件...正在运行.... val file = sc.textFile("/path/to/file.txt.gz",8) println(file.partitions.length) 返回 1 嗯,它必须在一个地方进行读取,因为那本质上是连续的。但我不明白为什么如果它不做 something 会有那个可选参数。 我明白了......所以因为 count 没有多大作用,它只保留了一个工人。但是,如果我运行 map 或 reduce,它应该开始传播数据集吗? 不知道,抱歉,但我猜应该。 【参考方案1】:

看起来您正在处理一个 gzip 压缩文件。

引用my answer here:

我认为您在 gzip 压缩文件中遇到了一个相当典型的问题,即它们无法并行加载。更具体地说,单个 gzip 压缩文件不能由多个任务并行加载,因此 Spark 将使用 1 个任务加载它,从而为您提供一个具有 1 个分区的 RDD。

您需要在加载 RDD 后显式重新分区,以便更多任务可以在其上并行运行。

例如:

val file = sc.textFile("/path/to/file.txt.gz").repartition(sc.defaultParallelism * 3)
println(file.count())

关于您问题的 cmets,设置 minPartitions 的原因在这里没有帮助是因为 a gzipped file is not splittable,所以 Spark 将始终使用 1 个任务来读取文件。

如果您在读取常规文本文件或使用 bzip2 等可拆分压缩格式压缩的文件时设置 minPartitions,您会看到 Spark 实际上会并行部署该数量的任务(最多为内核数量)在您的集群中可用)来读取文件。

【讨论】:

谢谢!那么你会推荐 bzip2 而不是 gzip 吗?如果我经常加载该文件,我应该如何优化每次运行? @Stephane - 这取决于传入的数据量以及集群花费多少时间重新分区数据。单个 gzip 文件可能没问题。如果一个文件太大,您可能还可以使用多个 gzip 压缩文件(即在压缩之前拆分),因为每个 gzip 文件可以并行加载到同一个 RDD 中(每个文件一个任务)。这可能是阻力最小的路径。 非常非常有趣,谢谢!所以 .gz.001 拆分文件或 bzip2 ......我将尝试两者!我认为是的,最大的瓶颈是第一次重新分区,所以如果我设法在文件即将到来时对其进行拆分,它可能会为我节省一点时间 @Stephane,你知道为什么存在这个限制吗?分发非 gzip 压缩文件的读取似乎并不容易 - 在这两种情况下,您都需要连续读取文件以找出下一条记录的开始位置? @Paul,我还没有尝试过 bzip2,我会告诉你并行阅读是否真的有效。我不知道,如果存档是可拆分的,那么我想你可以并行读取它(块 1 到 n,n+1 到 2n 等......)然后可能会在这里和那里发送一些丢失的字节到确保每个零件都正确成型。我希望这就是 Spark 所做的

以上是关于Spark:读取文本文件后的重新分区策略的主要内容,如果未能解决你的问题,请参考以下文章

为啥在 Spark 中重新分区比 partitionBy 快?

Spark中的最佳重新分区方式

重新分区分区数据

Spark2.4.3 中方法不存在错误导致重新分区失败

PySpark 重新分区 RDD 元素

Spark 重新分区执行器