Pyspark 数据框 OrderBy 分区级别还是整体?
Posted
技术标签:
【中文标题】Pyspark 数据框 OrderBy 分区级别还是整体?【英文标题】:Pyspark dataframe OrderBy partition level or overall? 【发布时间】:2019-04-26 03:22:13 【问题描述】:当我对 pyspark 数据框执行 orderBy 时,它是否会对所有分区中的数据(即整个结果)进行排序?还是分区级别的排序? 如果是后者,那么任何人都可以建议如何跨数据执行 orderBy 吗? 我在最后有一个订单
我当前的代码:
def extract_work(self, days_to_extract):
source_folders = self.work_folder_provider.get_work_folders(s3_source_folder=self.work_source,
warehouse_ids=self.warehouse_ids,
days_to_extract=days_to_extract)
source_df = self._load_from_s3(source_folders)
# Partition and de-dupe the data-frame retaining latest
source_df = self.data_frame_manager.partition_and_dedupe_data_frame(source_df,
partition_columns=['binScannableId', 'warehouseId'],
sort_key='cameraCaptureTimestampUtc',
desc=True)
# Filter out anything that does not qualify for virtual count.
source_df = self._virtual_count_filter(source_df)
history_folders = self.work_folder_provider.get_history_folders(s3_history_folder=self.history_source,
days_to_extract=days_to_extract)
history_df = self._load_from_s3(history_folders)
# Filter out historical items
if history_df:
source_df = source_df.join(history_df, 'binScannableId', 'leftanti')
else:
self.logger.error("No History was found")
# Sort by defectProbability
source_df = source_df.orderBy(desc('defectProbability'))
return source_df
def partition_and_dedupe_data_frame(data_frame, partition_columns, sort_key, desc):
if desc:
window = Window.partitionBy(partition_columns).orderBy(F.desc(sort_key))
else:
window = Window.partitionBy(partition_columns).orderBy(F.asc(sort_key))
data_frame = data_frame.withColumn('rank', F.rank().over(window)).filter(F.col('rank') == 1).drop('rank')
return data_frame
def _virtual_count_filter(self, source_df):
df = self._create_data_frame()
for key in self.virtual_count_thresholds.keys():
temp_df = source_df.filter((source_df['expectedQuantity'] == key) & (source_df['defectProbability'] > self.virtual_count_thresholds[key]))
df = df.union(temp_df)
return df
当我执行 df.explain() 时,我得到以下信息-
Physical Plan == *Sort [defectProbability#2 DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(defectProbability#2 DESC NULLS LAST, 25) +- *Project [expectedQuantity#0, cameraCaptureTimestampUtc#1, defectProbability#2, binScannableId#3, warehouseId#4, defectResult#5] +- *Filter ((isnotnull(rank#35) && (rank#35 = 1)) && (((((((expectedQuantity#0 = 0) && (defectProbability#2 > 0.99)) || ((expectedQuantity#0 = 1) && (defectProbability#2 > 0.98))) || ((expectedQuantity#0 = 2) && (defectProbability#2 > 0.99))) || ((expectedQuantity#0 = 3) && (defectProbability#2 > 0.99))) || ((expectedQuantity#0 = 4) && (defectProbability#2 > 0.99))) || ((expectedQuantity#0 = 5) && (defectProbability#2 > 0.99)))) +- Window [rank(cameraCaptureTimestampUtc#1) windowspecdefinition(binScannableId#3, warehouseId#4, cameraCaptureTimestampUtc#1 DESC NULLS LAST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#35], [binScannableId#3, warehouseId#4], [cameraCaptureTimestampUtc#1 DESC NULLS LAST] +- *Sort [binScannableId#3 ASC NULLS FIRST, warehouseId#4 ASC NULLS FIRST, cameraCaptureTimestampUtc#1 DESC NULLS LAST], false, 0 +- Exchange hashpartitioning(binScannableId#3, warehouseId#4, 25) +- Union :- Scan ExistingRDD[expectedQuantity#0,cameraCaptureTimestampUtc#1,defectProbability#2,binScannableId#3,warehouseId#4,defectResult#5] +- *FileScan json [expectedQuantity#13,cameraCaptureTimestampUtc#14,defectProbability#15,binScannableId#16,warehouseId#17,defectResult#18] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://vbi-autocount-chunking-prod-nafulfillment2/TPA1/2019/04/25/12/vbi-ac-chunk..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<expectedQuantity:int,cameraCaptureTimestampUtc:string,defectProbability:double,binScannabl...
【问题讨论】:
我想在我的代码中添加它,我将数据框划分为两列并删除重复项,然后再按第三列对数据框进行排序。我编写了一个测试来验证数据框中的项目是否已排序,而它们不是。一开始以为是测试有问题,现在好像还好 我从以下答案的评论中添加了您的代码。在您按某些列显式分区和数据排序的代码中,这只会导致分区级别排序。 其实我在上面的分区之后再做一个排序 请也添加这个问题(点击edit按钮)。 完成。感谢您的观看。 【参考方案1】:orderBy()
是一个“wide transformation”,这意味着 Spark 需要触发一个“shuffle”和“stage splits(1 个分区到多个输出分区) )" 从而检索分布在集群中的所有分区拆分,以在此处执行orderBy()
。
如果您查看解释计划,它有一个重新分区指示器,其中写入了默认的 200 个输出分区(spark.sql.shuffle.partitions 配置)执行后到磁盘。这告诉您在执行 Spark“action”时会发生“范围的转换”,也就是“shuffle”。
其他“广泛的转换”包括:distinct(), groupBy(), and join() => *sometimes*
from pyspark.sql.functions import desc
df = spark.range(10).orderBy(desc("id"))
df.show()
df.explain()
+---+
| id|
+---+
| 9|
| 8|
| 7|
| 6|
| 5|
| 4|
| 3|
| 2|
| 1|
| 0|
+---+
== Physical Plan ==
*(2) Sort [id#6L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#6L DESC NULLS LAST, 200)
+- *(1) Range (0, 10, step=1, splits=8)
【讨论】:
感谢您的回复,我已将数据框明确划分为 2 列以删除重复项。然后我尝试按第三列排序。我看到数据没有在所有数据框中进行排序。您需要代码 sn-p 吗? 当然...请提供更多代码...您也可以使用.dropDuplicates()
删除重复行...不知道为什么要在这里分区
我有一个非常自定义的 drop 副本。看到这个问题***.com/questions/55660085/…以上是关于Pyspark 数据框 OrderBy 分区级别还是整体?的主要内容,如果未能解决你的问题,请参考以下文章