Pyspark Parquet - 重新分区后排序

Posted

技术标签:

【中文标题】Pyspark Parquet - 重新分区后排序【英文标题】:Pyspark Parquet - sort after repartition 【发布时间】:2020-01-03 12:26:57 【问题描述】:

我在拼花地板中对输出进行排序时遇到问题。我正在从另一个完全随机且非常大的镶木地板加载数据(数千行 - 重要事实)。获取有关电话用户和桌面用户的信息并计算他们的查询 + 获取查询总数。

寻找这样的表格(按总数排序):

query | desktop_count | phone_count |    total
------|---------------|-------------|------------
query1|      123      |     321     |     444
query2|      23       |     32      |     55
query3|      12       |     21      |     33

问题是 - 每当我想使用任何类型的功能时,它都会分成几部分,然后 repartition(1) 将它们连接在一起但没有排序。有没有什么方法可以将 20 块镶木地板拼成一个但已分类? 如果需要任何信息,请询问。

代码(尝试了更多的重新分区):

def computeMSQueries(self):

        pq = self.pyspark.read.parquet(*self.pySearchPaths)

        desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().repartition(1).withColumnRenamed('count','desktop_count')
        phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().repartition(1).withColumnRenamed('count','phone_count')

        res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())


        return res



self.computeMSQueries().repartition(1).write.parquet(outputDir)

【问题讨论】:

尝试使用coalesce(1)而不是repartition来保持顺序。 完美。谢谢! 【参考方案1】:

加入时您应该避免使用repartition(),因为与coalesce() 相比,这非常昂贵,因为coalesce() 可以避免数据移动。

另一件事是repartition()可以增加/减少分区数,但coalesce()只能减少分区数。这就是数据被排序而不是打乱的原因。

此外,coalesce() 使用现有分区来最大程度地减少混洗的数据量。 repartition() 创建新分区并进行完全洗牌。 coalesce 导致具有不同数据量的分区(有时分区具有很大不同的大小),并且 repartition 导致大小大致相等的分区。

所以你可以像下面这样使用,只需将 repartition 替换为 coalesce 就可以了:

def computeMSQueries(self):

        pq = self.pyspark.read.parquet(*self.pySearchPaths)

        desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().coalesce(1).withColumnRenamed('count','desktop_count')
        phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().coalesce(1).withColumnRenamed('count','phone_count')

        res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())


        return res



self.computeMSQueries().coalesce(1).write.parquet(outputDir)

【讨论】:

以上是关于Pyspark Parquet - 重新分区后排序的主要内容,如果未能解决你的问题,请参考以下文章

重新分区 pyspark 数据帧失败以及如何避免初始分区大小

读取 PySpark 中的所有分区 parquet 文件

用修改后的 PySpark DataFrame 覆盖现有 Parquet 数据集

java - 如何在类似于pyspark的java分区中写入parquet文件?

pyspark - 分区数据的计算(使用“附加”模式创建)慢

使用 pyspark 同时编写 parquet 文件