在 PySpark 中读取文本文件时有没有办法控制分区数

Posted

技术标签:

【中文标题】在 PySpark 中读取文本文件时有没有办法控制分区数【英文标题】:Is there a way to control the number of partitions when reading a text file in PySpark 【发布时间】:2016-03-19 19:45:14 【问题描述】:

我正在使用 PySpark 中的以下命令读取文本文件

rating_data_raw = sc.textFile("/<path_to_csv_file>.csv")

有没有办法指定 RDD rating_data_raw 应该分成多少个分区?我想指定大量的分区以获得更大的并发性。

【问题讨论】:

可以添加python标签吗?这种方式将为您的代码添加高亮 您可以在读取文件时声明最小分区数,请参阅此处的文档 - spark.apache.org/docs/latest/api/python/pyspark.html 【参考方案1】:

正如其他用户所说,您可以通过在textFile 的可选参数minPartitions 中设置来设置在读取文件时将创建的最小分区数。

rating_data_raw = sc.textFile("/<path_to_csv_file>.csv", minPartitions=128)

另一种方法是使用repartition或coalesce,如果需要减少分区数可以使用coalesce,否则可以使用repartition

rating_data_raw = sc.textFile("/<path_to_csv_file>.csv").repartition(128)

【讨论】:

【参考方案2】:

也可以读取 .csv 文件,然后使用 df 到 RDD 转换检查分区。我在下面留下一个示例结构。

dataset = spark.read.csv("data.csv", header=True, inferSchema='True')
colsDrop = ("data_index", "_c17", "song_title", "artist")
df = dataset.drop(*colsDrop)
rdd = sc.parallelize(df.collect()).partitionBy(8)

这里 .partitionBy() 允许您控制 RDD 对象的分区号。也可以使用 .getNumPartition() 方法找出这些数字。

唯一需要注意的是,在 CPU 上分配的分区数多于线程数不会给我们带来速度提升。

例如,我的 CPU 中的线程数是 8,您可以在下面看到一个示例时间分布。

如你所见,我在 8 个分区后无法加快速度。

【讨论】:

另外,如果你想通过 df 或 sql 使用 RDD API,默认分区号为 1。trainingData=df.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1]))print("Partition: ",trainingData.getNumPartitions())响应:分区:1

以上是关于在 PySpark 中读取文本文件时有没有办法控制分区数的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 在文本文件中读取为密集向量

如何在 pyspark 数据框中读取 csv 文件时读取选定的列?

删除 RDD、Pyspark 中的停用词

从输入流Java读取时有没有办法超时? [复制]

Pyspark 从 csv 文件中读取 delta/upsert 数据集

PySpark 读取不存在文件时的错误处理