在 PySpark 中读取文本文件时有没有办法控制分区数
Posted
技术标签:
【中文标题】在 PySpark 中读取文本文件时有没有办法控制分区数【英文标题】:Is there a way to control the number of partitions when reading a text file in PySpark 【发布时间】:2016-03-19 19:45:14 【问题描述】:我正在使用 PySpark 中的以下命令读取文本文件
rating_data_raw = sc.textFile("/<path_to_csv_file>.csv")
有没有办法指定 RDD rating_data_raw 应该分成多少个分区?我想指定大量的分区以获得更大的并发性。
【问题讨论】:
可以添加python标签吗?这种方式将为您的代码添加高亮 您可以在读取文件时声明最小分区数,请参阅此处的文档 - spark.apache.org/docs/latest/api/python/pyspark.html 【参考方案1】:正如其他用户所说,您可以通过在textFile 的可选参数minPartitions
中设置来设置在读取文件时将创建的最小分区数。
rating_data_raw = sc.textFile("/<path_to_csv_file>.csv", minPartitions=128)
另一种方法是使用repartition或coalesce,如果需要减少分区数可以使用coalesce
,否则可以使用repartition
。
rating_data_raw = sc.textFile("/<path_to_csv_file>.csv").repartition(128)
【讨论】:
【参考方案2】:也可以读取 .csv 文件,然后使用 df 到 RDD 转换检查分区。我在下面留下一个示例结构。
dataset = spark.read.csv("data.csv", header=True, inferSchema='True')
colsDrop = ("data_index", "_c17", "song_title", "artist")
df = dataset.drop(*colsDrop)
rdd = sc.parallelize(df.collect()).partitionBy(8)
这里 .partitionBy() 允许您控制 RDD 对象的分区号。也可以使用 .getNumPartition() 方法找出这些数字。
唯一需要注意的是,在 CPU 上分配的分区数多于线程数不会给我们带来速度提升。
例如,我的 CPU 中的线程数是 8,您可以在下面看到一个示例时间分布。
如你所见,我在 8 个分区后无法加快速度。
【讨论】:
另外,如果你想通过 df 或 sql 使用 RDD API,默认分区号为 1。trainingData=df.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1]))
print("Partition: ",trainingData.getNumPartitions())
响应:分区:1以上是关于在 PySpark 中读取文本文件时有没有办法控制分区数的主要内容,如果未能解决你的问题,请参考以下文章
如何在 pyspark 数据框中读取 csv 文件时读取选定的列?