Pyspark 数据帧重新分区将所有数据放在一个分区中

Posted

技术标签:

【中文标题】Pyspark 数据帧重新分区将所有数据放在一个分区中【英文标题】:Pyspark dataframe repartitioning puts all data in one partition 【发布时间】:2018-01-08 07:01:14 【问题描述】:

我有一个架构如下的数据框:

root
 |-- category_id: string (nullable = true)
 |-- article_title: string (nullable = true)

看起来像这样的数据:

+-----------+--------------------+
|category_id|     articletitle   |
+-----------+--------------------+
|       1000|HP EliteOne 800 G...|
|       1000|ASUS  EB1501P ATM...|
|       1000|HP EliteOne 800 G...|
|          1|ASUS R557LA-XO119...|
|          1|HP EliteOne 800 G...|
+-----------+--------------------+

只有两个不同的category_id 1000 和 1。

我想通过category_idmapPartition 对每个分区进行重新分区。

p_df = df.repartition(2, "category_id")
p_df.rdd.mapPartitionsWithIndex(some_func)

但是数据没有得到正确的分区,预期的结果是每个 mappartition 将只有一个category_id 的数据。但实际结果是一个分区得到0条记录,而另一个分区得到所有记录。

为什么会发生这种情况以及如何解决这个问题?

已经有一个question 介绍了 spark 分区器的工作原理。我的问题不同,因为答案仅包含对分区器如何工作的解释,但我的问题是关于为什么会发生这种情况(已经回答)以及如何解决它。

【问题讨论】:

您是如何得出一个分区为空而另一个分区有所有记录的结论的?可以添加p_df.withColumn("partition" , spark_partition_id()).show()的输出吗? 没关系。它为 Spark 1.6 提供了准确的分区,但为 Spark 2.2 中的所有记录提供了相同的分区 ID。 【参考方案1】:

您正确使用了repartitionmapPartitionsWithIndex 函数。

如果你将explain函数应用为

df.repartition(2, "category_id").explain()

您将看到以下输出,清楚地表明它已重新分区为两个分区。

== Physical Plan ==
Exchange hashpartitioning(category_id#0L, 2)
+- Scan ExistingRDD[category_id#0L,articletitle#1L]

现在真正的罪魁祸首是 hashPartitioning,它将 1、10、1000、100000 ... 视为与分区号相同的哈希 2

解决方案是将分区数更改为 3 或更多,

category_id 1000 更改为其他值。

【讨论】:

【参考方案2】:

@Ramesh Maharjan 在上述答案中解释了重新分区将所有数据放在一个分区中的原因。更多关于哈希分区here

我能够通过使用自定义分区器将数据转到不同的分区器。我将rdd变成pairRdd格式(category_id,row)并使用partitionBy方法给出分区数和custom_partitioner。

    categories = input_df.select("category_id").distinct().rdd.map(lambda r: r.category_id).collect()
    cat_idx = dict([(cat, idx) for idx, cat in enumerate(categories)])

    def category_partitioner(cid):
        return cat_idx[cid]

【讨论】:

以上是关于Pyspark 数据帧重新分区将所有数据放在一个分区中的主要内容,如果未能解决你的问题,请参考以下文章

text Pyspark数据帧重新分区

将 PySpark 数据帧写入分区 Hive 表

PySpark - 分区中覆盖的数据

PySpark 根据特定列重新分区

如何从 BigQuery 读取分区表到 Spark 数据帧(在 PySpark 中)

如何使用 pyspark 管理跨集群的数据帧的物理数据放置?