PySpark DataFrame:找到最接近的值并对 DataFrame 进行切片

Posted

技术标签:

【中文标题】PySpark DataFrame:找到最接近的值并对 DataFrame 进行切片【英文标题】:PySpark DataFrame: Find closest value and slice the DataFrame 【发布时间】:2019-03-27 18:01:16 【问题描述】:

所以,我已经进行了足够的研究,但还没有找到可以解决我想做的事情的帖子。

我有一个 PySpark DataFrame my_df,它是 sorted by value column-

+----+-----+                                                                    
|name|value|
+----+-----+
|   A|   30|
|   B|   25|
|   C|   20|
|   D|   18|
|   E|   18|
|   F|   15|
|   G|   10|
+----+-----+

value 列中所有计数的总和等于136。我想获取combined values >= x% of 136 的所有行。在此示例中,假设为 x=80。然后target sum = 0.8*136 = 108.8。因此,新的 DataFrame 将包含所有具有combined value >= 108.8 的行。

在我们的示例中,这将归结为 D 行(因为组合值高达 D = 30+25+20+18 = 93)。

但是,困难的部分是我还想包含紧随其后的具有重复值的行。在这种情况下,我还想包含行 E,因为它与行 D 具有相同的值,即 18

我想通过给出一个百分比x 变量来分割my_df,例如如上所述的80。新的 DataFrame 应包含以下行-

+----+-----+                                                                    
|name|value|
+----+-----+
|   A|   30|
|   B|   25|
|   C|   20|
|   D|   18|
|   E|   18|
+----+-----+

我可以在这里做的一件事是遍历 DataFrame (which is ~360k rows),但我想这违背了 Spark 的目的。

这里有我想要的简洁功能吗?

【问题讨论】:

添加了更好的描述 你能把你使用的代码分享给sortDataFrame吗?它是基于value 的吗?还是valuename value排序 【参考方案1】:

使用 pyspark SQL 函数简洁地做到这一点。

result = my_df.filter(my_df.value > target).select(my_df.name,my_df.value)
result.show()

编辑:基于 OP 的问题编辑 - 计算运行总和并获取行,直到达到目标值。请注意,这将导致最多 D 行,而不是 E..这似乎是一个奇怪的要求。

from pyspark.sql import Window
from pyspark.sql import functions as f

# Total sum of all `values`
target = (my_df.agg(sum("value")).collect())[0][0]

w = Window.orderBy(my_df.name) #Ideally this should be a column that specifies ordering among rows
running_sum_df = my_df.withColumn('rsum',f.sum(my_df.value).over(w))
running_sum_df.filter(running_sum_df.rsum <= 0.8*target)

【讨论】:

很抱歉我忘了说我还需要做一些其他的操作,请查看更新 在回答完问题后完全更改问题是不公平的。 我明白了。谢谢你的回答。我仍然需要找出一个解决重复值的解决方案。【参考方案2】:

您的要求非常严格,因此很难为您的问题制定有效的解决方案。不过,这里有一种方法:

首先计算value 列的累积和和总和,并使用您指定的目标条件百分比过滤DataFrame。我们称这个结果为df_filtered

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.orderBy(f.col("value").desc(), "name").rangeBetween(Window.unboundedPreceding, 0)
target = 0.8

df_filtered = df.withColumn("cum_sum", f.sum("value").over(w))\
    .withColumn("total_sum", f.sum("value").over(Window.partitionBy()))\
    .where(f.col("cum_sum") <= f.col("total_sum")*target)

df_filtered.show()
#+----+-----+-------+---------+
#|name|value|cum_sum|total_sum|
#+----+-----+-------+---------+
#|   A|   30|     30|      136|
#|   B|   25|     55|      136|
#|   C|   20|     75|      136|
#|   D|   18|     93|      136|
#+----+-----+-------+---------+

然后在value 列的原始数据上加入这个过滤的DataFrame。由于您的 DataFrame 已经按 value 排序,因此最终输出将包含您想要的行。

df.alias("r")\
    .join(
    df_filtered.alias('l'),
    on="value"
).select("r.name", "r.value").sort(f.col("value").desc(), "name").show()
#+----+-----+
#|name|value|
#+----+-----+
#|   A|   30|
#|   B|   25|
#|   C|   20|
#|   D|   18|
#|   E|   18|
#+----+-----+

total_sumcum_sum 列是 calculated using a Window function。

窗口w 按降序排列value 列,然后是name 列。 name 列用于打破关系 - 如果没有它,CD 两行的 111 = 75+18+18 的累积总和将相同,并且您会在过滤器中错误地丢失它们。

w = Window\                                     # Define Window
    .orderBy(                                   # This will define ordering
        f.col("value").desc(),                  # First sort by value descending
        "name"                                  # Sort on name second
    )\
    .rangeBetween(Window.unboundedPreceding, 0) # Extend back to beginning of window

rangeBetween(Window.unboundedPreceding, 0) 指定窗口应包括当前行之前的所有行(由orderBy 定义)。这就是使它成为累积总和的原因。

【讨论】:

没错,但问题的表述方式意味着需要包含所有具有 18 行的行。 (除非我遗漏了什么) following 表示排序顺序。由于 DataFrame 是按值排序的,因此具有相同值的行将始终紧随其后。更高的值都不会出现在 DataFrame 的其他地方。 @VamsiPrabhala 紧随其后,我并不是说只有 1 行 - 它可以是具有相同值的多行。既然是按value排序的,那么它们肯定是相邻的。 @kev 我添加了对 Window 的解释,其中包含一个帖子的链接,该帖子显示了如何在 pyspark 中进行累积和。 DataFrame 是 not 有序的 - DataFrame 本质上是无序的,除非您明确指定排序。 @kev 不认为 Spark DataFrames 有任何顺序。即使它看起来排序,也不能保证它在引擎盖下是这样的。 Spark 将数据分布在多台机器上,这允许对它们进行并行操作。由于每个执行者不必担心顺序,它可以独立地处理自己的部分数据。当您需要订单时,您必须指定 如何 进行排序,然后 Spark 将根据需要在 executor 之间打乱数据。所以声明 df is already sorted by value 是假的。

以上是关于PySpark DataFrame:找到最接近的值并对 DataFrame 进行切片的主要内容,如果未能解决你的问题,请参考以下文章

如何在没有直接连接列的两个数据框之间找到最接近的匹配行?

PySpark:转换DataFrame中给定列的值

C++:查找数组中最接近的值

在jquery中找到最接近的值

用列表 Pyspark Dataframe 中的值替换 NA

在熊猫数据框中使用 np.isclose 报告最接近的值