PySpark DataFrame:找到最接近的值并对 DataFrame 进行切片
Posted
技术标签:
【中文标题】PySpark DataFrame:找到最接近的值并对 DataFrame 进行切片【英文标题】:PySpark DataFrame: Find closest value and slice the DataFrame 【发布时间】:2019-03-27 18:01:16 【问题描述】:所以,我已经进行了足够的研究,但还没有找到可以解决我想做的事情的帖子。
我有一个 PySpark DataFrame my_df
,它是 sorted
by value
column-
+----+-----+
|name|value|
+----+-----+
| A| 30|
| B| 25|
| C| 20|
| D| 18|
| E| 18|
| F| 15|
| G| 10|
+----+-----+
value
列中所有计数的总和等于136
。我想获取combined values >= x% of 136
的所有行。在此示例中,假设为 x=80
。然后target sum = 0.8*136 = 108.8
。因此,新的 DataFrame 将包含所有具有combined value >= 108.8
的行。
在我们的示例中,这将归结为 D
行(因为组合值高达 D = 30+25+20+18 = 93
)。
但是,困难的部分是我还想包含紧随其后的具有重复值的行。在这种情况下,我还想包含行 E
,因为它与行 D
具有相同的值,即 18
。
我想通过给出一个百分比x
变量来分割my_df
,例如如上所述的80
。新的 DataFrame 应包含以下行-
+----+-----+
|name|value|
+----+-----+
| A| 30|
| B| 25|
| C| 20|
| D| 18|
| E| 18|
+----+-----+
我可以在这里做的一件事是遍历 DataFrame (which is ~360k rows)
,但我想这违背了 Spark 的目的。
这里有我想要的简洁功能吗?
【问题讨论】:
添加了更好的描述 你能把你使用的代码分享给sort
DataFrame吗?它是基于value
的吗?还是value
和name
?
按value
排序
【参考方案1】:
使用 pyspark SQL 函数简洁地做到这一点。
result = my_df.filter(my_df.value > target).select(my_df.name,my_df.value)
result.show()
编辑:基于 OP 的问题编辑 - 计算运行总和并获取行,直到达到目标值。请注意,这将导致最多 D 行,而不是 E..这似乎是一个奇怪的要求。
from pyspark.sql import Window
from pyspark.sql import functions as f
# Total sum of all `values`
target = (my_df.agg(sum("value")).collect())[0][0]
w = Window.orderBy(my_df.name) #Ideally this should be a column that specifies ordering among rows
running_sum_df = my_df.withColumn('rsum',f.sum(my_df.value).over(w))
running_sum_df.filter(running_sum_df.rsum <= 0.8*target)
【讨论】:
很抱歉我忘了说我还需要做一些其他的操作,请查看更新 在回答完问题后完全更改问题是不公平的。 我明白了。谢谢你的回答。我仍然需要找出一个解决重复值的解决方案。【参考方案2】:您的要求非常严格,因此很难为您的问题制定有效的解决方案。不过,这里有一种方法:
首先计算value
列的累积和和总和,并使用您指定的目标条件百分比过滤DataFrame。我们称这个结果为df_filtered
:
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.orderBy(f.col("value").desc(), "name").rangeBetween(Window.unboundedPreceding, 0)
target = 0.8
df_filtered = df.withColumn("cum_sum", f.sum("value").over(w))\
.withColumn("total_sum", f.sum("value").over(Window.partitionBy()))\
.where(f.col("cum_sum") <= f.col("total_sum")*target)
df_filtered.show()
#+----+-----+-------+---------+
#|name|value|cum_sum|total_sum|
#+----+-----+-------+---------+
#| A| 30| 30| 136|
#| B| 25| 55| 136|
#| C| 20| 75| 136|
#| D| 18| 93| 136|
#+----+-----+-------+---------+
然后在value
列的原始数据上加入这个过滤的DataFrame。由于您的 DataFrame 已经按 value
排序,因此最终输出将包含您想要的行。
df.alias("r")\
.join(
df_filtered.alias('l'),
on="value"
).select("r.name", "r.value").sort(f.col("value").desc(), "name").show()
#+----+-----+
#|name|value|
#+----+-----+
#| A| 30|
#| B| 25|
#| C| 20|
#| D| 18|
#| E| 18|
#+----+-----+
total_sum
和 cum_sum
列是 calculated using a Window
function。
窗口w
按降序排列value
列,然后是name
列。 name
列用于打破关系 - 如果没有它,C
和 D
两行的 111 = 75+18+18
的累积总和将相同,并且您会在过滤器中错误地丢失它们。
w = Window\ # Define Window
.orderBy( # This will define ordering
f.col("value").desc(), # First sort by value descending
"name" # Sort on name second
)\
.rangeBetween(Window.unboundedPreceding, 0) # Extend back to beginning of window
rangeBetween(Window.unboundedPreceding, 0)
指定窗口应包括当前行之前的所有行(由orderBy
定义)。这就是使它成为累积总和的原因。
【讨论】:
没错,但问题的表述方式意味着需要包含所有具有 18 行的行。 (除非我遗漏了什么) following 表示排序顺序。由于 DataFrame 是按值排序的,因此具有相同值的行将始终紧随其后。更高的值都不会出现在 DataFrame 的其他地方。 @VamsiPrabhala 紧随其后,我并不是说只有 1 行 - 它可以是具有相同值的多行。既然是按value
排序的,那么它们肯定是相邻的。
@kev 我添加了对 Window 的解释,其中包含一个帖子的链接,该帖子显示了如何在 pyspark 中进行累积和。 DataFrame 是 not 有序的 - DataFrame 本质上是无序的,除非您明确指定排序。
@kev 不认为 Spark DataFrames 有任何顺序。即使它看起来排序,也不能保证它在引擎盖下是这样的。 Spark 将数据分布在多台机器上,这允许对它们进行并行操作。由于每个执行者不必担心顺序,它可以独立地处理自己的部分数据。当您需要订单时,您必须指定 如何 进行排序,然后 Spark 将根据需要在 executor 之间打乱数据。所以声明 df
is already sorted by value 是假的。以上是关于PySpark DataFrame:找到最接近的值并对 DataFrame 进行切片的主要内容,如果未能解决你的问题,请参考以下文章