如何在使用 Spark 读取时将数据分配到 X 分区?

Posted

技术标签:

【中文标题】如何在使用 Spark 读取时将数据分配到 X 分区?【英文标题】:How to distribute data into X partitions on read with Spark? 【发布时间】:2021-09-21 18:24:01 【问题描述】:

我正在尝试使用 Spark DF 从 Hive 读取数据,并将其分配到特定的可配置数量的分区中(与内核数量相关)。我的工作非常简单,它不包含任何连接或聚合。我读过spark.sql.shuffle.partitions 属性,但文档说:

配置在为联接或聚合打乱数据时要使用的分区数。

这是否意味着配置此属性对我来说无关紧要?或者读取操作是否被视为洗牌?如果没有,有什么替代方案?重新分区和合并似乎有点矫枉过正。

【问题讨论】:

【参考方案1】:

为了验证我对您的问题的理解,您希望增加读取数据后立即创建的 rdd/dataframe 中的分区数。

在这种情况下,您所追求的属性是spark.sql.files.maxPartitionBytes,它控制可以在最大分区中推送的最大数据(请参阅https://spark.apache.org/docs/2.4.0/sql-performance-tuning.html) 默认值为 128 MB,可以覆盖它以提高并行度。

【讨论】:

我想澄清两个注意事项: 1. 我从 Hive 读取数据。 files 属性仍然相关吗? 2.我不想限制分区数量,我想选择确切的分区数量,这样我就可以绝对管理并行度。 如果您使用spark.read.xxx 函数来读取数据,而与函数 xxx 无关,则此属性将起作用。换句话说,如果您读取spark.read.tablespark.read.json 等任何函数,它将起作用。数据帧或 rdd 中的分区不依赖于源上的分区,并且可能与您正在从中读取数据的源不一致,例如,如果您对具有 10 个分区的表进行全表扫描,则您的数据框可能有 10 个以上的分区。 据我所知,在运行时无法确定数据帧的分区数量,旧版本的 spark 具有该选项,但它是静态值而不是动态值。请参阅此链接以获取相同的***.com/questions/39368516/… 我的意思是静态值。就像我放在 spark.sql.shuffle.partitions 属性上的数字一样,但用于阅读。【参考方案2】:

Read 本身并不是一个随机播放。您需要在某个阶段获取数据。

可以使用下面的答案,或者 Spark 的算法在读取时设置分区数。

您没有说明您使用的是 RDD 还是 DF。使用 RDD,您可以设置 num 个分区。使用 DF,一般读取后需要重新分区。

正如您所指出的,在加入或聚合时,您关于控制并行性的观点不太相关。

【讨论】:

所以你是说如果我的工作只是读写,那么 spark.sql.shuffle.partitions 根本不会影响?例如,即使我将此属性配置为 50,我也会将数据读入 200 个分区? Afaik 是的。随机播放...用于连接,聚合

以上是关于如何在使用 Spark 读取时将数据分配到 X 分区?的主要内容,如果未能解决你的问题,请参考以下文章

如果列值取决于文件路径,有没有办法在一次读取多个文件时将文字作为列添加到 spark 数据框中?

如何在 Spark 上平均分配 Beam 任务?

从 HDFS 读取到 Spark

如何在 Databricks 中读取批量 excel 文件数据并加载到 spark 数据框中

PySpark S3 文件读取性能考虑

如何读取 csv 文件并将值分配给 spark scala 中的变量