Pyspark Parquet - 重新分区后排序
Posted
技术标签:
【中文标题】Pyspark Parquet - 重新分区后排序【英文标题】:Pyspark Parquet - sort after repartition 【发布时间】:2020-01-03 12:26:57 【问题描述】:我在拼花地板中对输出进行排序时遇到问题。我正在从另一个完全随机且非常大的镶木地板加载数据(数千行 - 重要事实)。获取有关电话用户和桌面用户的信息并计算他们的查询 + 获取查询总数。
寻找这样的表格(按总数排序):
query | desktop_count | phone_count | total
------|---------------|-------------|------------
query1| 123 | 321 | 444
query2| 23 | 32 | 55
query3| 12 | 21 | 33
问题是 - 每当我想使用任何类型的功能时,它都会分成几部分,然后 repartition(1) 将它们连接在一起但没有排序。有没有什么方法可以将 20 块镶木地板拼成一个但已分类? 如果需要任何信息,请询问。
代码(尝试了更多的重新分区):
def computeMSQueries(self):
pq = self.pyspark.read.parquet(*self.pySearchPaths)
desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().repartition(1).withColumnRenamed('count','desktop_count')
phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().repartition(1).withColumnRenamed('count','phone_count')
res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())
return res
self.computeMSQueries().repartition(1).write.parquet(outputDir)
【问题讨论】:
尝试使用coalesce(1)
而不是repartition来保持顺序。
完美。谢谢!
【参考方案1】:
加入时您应该避免使用repartition()
,因为与coalesce()
相比,这非常昂贵,因为coalesce()
可以避免数据移动。
另一件事是repartition()
可以增加/减少分区数,但coalesce()
只能减少分区数。这就是数据被排序而不是打乱的原因。
此外,coalesce()
使用现有分区来最大程度地减少混洗的数据量。 repartition()
创建新分区并进行完全洗牌。 coalesce 导致具有不同数据量的分区(有时分区具有很大不同的大小),并且 repartition 导致大小大致相等的分区。
所以你可以像下面这样使用,只需将 repartition
替换为 coalesce
就可以了:
def computeMSQueries(self):
pq = self.pyspark.read.parquet(*self.pySearchPaths)
desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().coalesce(1).withColumnRenamed('count','desktop_count')
phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().coalesce(1).withColumnRenamed('count','phone_count')
res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())
return res
self.computeMSQueries().coalesce(1).write.parquet(outputDir)
【讨论】:
以上是关于Pyspark Parquet - 重新分区后排序的主要内容,如果未能解决你的问题,请参考以下文章
重新分区 pyspark 数据帧失败以及如何避免初始分区大小
用修改后的 PySpark DataFrame 覆盖现有 Parquet 数据集
java - 如何在类似于pyspark的java分区中写入parquet文件?