Pyspark 数据帧重新分区将所有数据放在一个分区中
Posted
技术标签:
【中文标题】Pyspark 数据帧重新分区将所有数据放在一个分区中【英文标题】:Pyspark dataframe repartitioning puts all data in one partition 【发布时间】:2018-01-08 07:01:14 【问题描述】:我有一个架构如下的数据框:
root
|-- category_id: string (nullable = true)
|-- article_title: string (nullable = true)
看起来像这样的数据:
+-----------+--------------------+
|category_id| articletitle |
+-----------+--------------------+
| 1000|HP EliteOne 800 G...|
| 1000|ASUS EB1501P ATM...|
| 1000|HP EliteOne 800 G...|
| 1|ASUS R557LA-XO119...|
| 1|HP EliteOne 800 G...|
+-----------+--------------------+
只有两个不同的category_id
1000 和 1。
我想通过category_id
和mapPartition
对每个分区进行重新分区。
p_df = df.repartition(2, "category_id")
p_df.rdd.mapPartitionsWithIndex(some_func)
但是数据没有得到正确的分区,预期的结果是每个 mappartition 将只有一个category_id
的数据。但实际结果是一个分区得到0条记录,而另一个分区得到所有记录。
为什么会发生这种情况以及如何解决这个问题?
已经有一个question 介绍了 spark 分区器的工作原理。我的问题不同,因为答案仅包含对分区器如何工作的解释,但我的问题是关于为什么会发生这种情况(已经回答)以及如何解决它。
【问题讨论】:
您是如何得出一个分区为空而另一个分区有所有记录的结论的?可以添加p_df.withColumn("partition" , spark_partition_id()).show()
的输出吗?
没关系。它为 Spark 1.6 提供了准确的分区,但为 Spark 2.2 中的所有记录提供了相同的分区 ID。
【参考方案1】:
您正确使用了repartition
和mapPartitionsWithIndex
函数。
如果你将explain
函数应用为
df.repartition(2, "category_id").explain()
您将看到以下输出,清楚地表明它已重新分区为两个分区。
== Physical Plan ==
Exchange hashpartitioning(category_id#0L, 2)
+- Scan ExistingRDD[category_id#0L,articletitle#1L]
现在真正的罪魁祸首是 hashPartitioning,它将 1、10、1000、100000 ... 视为与分区号相同的哈希 2
解决方案是将分区数更改为 3 或更多,
或
将category_id
1000 更改为其他值。
【讨论】:
【参考方案2】:@Ramesh Maharjan 在上述答案中解释了重新分区将所有数据放在一个分区中的原因。更多关于哈希分区here
我能够通过使用自定义分区器将数据转到不同的分区器。我将rdd变成pairRdd格式(category_id,row)并使用partitionBy方法给出分区数和custom_partitioner。
categories = input_df.select("category_id").distinct().rdd.map(lambda r: r.category_id).collect()
cat_idx = dict([(cat, idx) for idx, cat in enumerate(categories)])
def category_partitioner(cid):
return cat_idx[cid]
【讨论】:
以上是关于Pyspark 数据帧重新分区将所有数据放在一个分区中的主要内容,如果未能解决你的问题,请参考以下文章