如何在 Spark 中计算过去时间值的“percent_rank”?
Posted
技术标签:
【中文标题】如何在 Spark 中计算过去时间值的“percent_rank”?【英文标题】:How to calculate `percent_rank` on past time values in Spark? 【发布时间】:2017-08-23 08:41:43 【问题描述】:我想计算列 x
的 percent_rank
,但排名应该仅与 过去 值相关 - 由时间戳变量 t
确定。
似乎F.percent_rank()
不接受任何参数并且要获得独立于时间的排名,您将使用percent_rank().over(Window.orderBy("x"))
?!
有没有办法根据时间戳较小的值的数量来获得排名?
预期的结果是这样的
t x perc_rank_win
0 1 0.0
1 3 1.0 # since 3 is largest from [1, 3]
2 5 1.0 # since 5 is largest from [1, 3, 5]
3 4 0.66 # since values are [1, 3, 4!, 5]
4 2 0.25 # since [1, 2!, 3, 4, 5]
【问题讨论】:
【参考方案1】:这是我尝试在窗口分区上使用 collect_set 的另一个解决方法,
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(0,1),(1,3),(2,5),(3,4),(4,2)],['t','x'])
df.show()
+---+---+
| t| x|
+---+---+
| 0| 1|
| 1| 3|
| 2| 5|
| 3| 4|
| 4| 2|
+---+---+
w = Window.orderBy('t')
df = df.withColumn('somecol',F.collect_set('x').over(w))
df.show()
+---+---+---------------+
| t| x| somecol|
+---+---+---------------+
| 0| 1| [1]|
| 1| 3| [1, 3]|
| 2| 5| [1, 5, 3]|
| 3| 4| [1, 5, 3, 4]|
| 4| 2|[1, 5, 2, 3, 4]|
+---+---+---------------+
def pct_rank(s,v):
x=sorted(s)
if len(x) == 1:
return float(0)
else:
pc = float(1)/(len(x)-1)
idx = x.index(v)
return float("0:.2f".format(idx*pc))
pct_udf = F.udf(pct_rank)
df.select("t","x",pct_udf(df['somecol'],df['x']).alias('pct_rank')).show()
+---+---+--------+
| t| x|pct_rank|
+---+---+--------+
| 0| 1| 0.0|
| 1| 3| 1.0|
| 2| 5| 1.0|
| 3| 4| 0.67|
| 4| 2| 0.25|
+---+---+--------+
【讨论】:
【参考方案2】:为了获得滚动的percent_rank()
,您必须能够使用窗口框架定义来对您根本不能的功能进行排名。 (类似这样的w = Window.orderBy('t', 'x').rowsBetween(-sys.maxsize, 0)
)
我找到了一种解决方法,但它涉及一个非常昂贵的笛卡尔连接:
首先让我们创建示例数据框:
import pyspark.sql.functions as psf
from pyspark.sql import HiveContext
hc = HiveContext(sc)
df = hc.createDataFrame(sc.parallelize(zip(range(5), [1,3,5,4,2])), ['t', 'x'])
笛卡尔连接:
df2 = df.groupBy(df.x.alias('x2')).agg(psf.min("t").alias("t2"))
df_cross = df.join(df2).filter("t2 <= t").withColumn("isSup", (df.x > df2.x2).cast("int"))
+---+---+---+---+-----+
| t| x| t2| x2|isSup|
+---+---+---+---+-----+
| 1| 3| 0| 1| 1|
| 2| 5| 0| 1| 1|
| 2| 5| 1| 3| 1|
| 3| 4| 0| 1| 1|
| 3| 4| 1| 3| 1|
| 3| 4| 2| 5| 0|
| 4| 2| 0| 1| 1|
| 4| 2| 1| 3| 0|
| 4| 2| 2| 5| 0|
| 4| 2| 3| 4| 0|
+---+---+---+---+-----+
最后我们按't'、'x'分组:
df_fin = df_cross.groupBy("t", "x").agg(
psf.count("*").alias("count"),
psf.sum("isSup").alias("rank")
).withColumn('pct_rank_win', psf.col("rank")/psf.greatest(psf.col('count') - 1, psf.lit(1)))
+---+---+-----+----+------------------+
| t| x|count|rank| pct_rank_win|
+---+---+-----+----+------------------+
| 0| 1| 1| 0| 0.0|
| 1| 3| 2| 1| 1.0|
| 2| 5| 3| 2| 1.0|
| 3| 4| 4| 2|0.6666666666666666|
| 4| 2| 5| 1| 0.25|
+---+---+-----+----+------------------+
df2
定义中的groupBy('x')
是为了确保密集排名(相同的值将具有相同的排名),如下例所示:
df = hc.createDataFrame(sc.parallelize(zip(range(6), [1,3,3,5,4,2])), ['t', 'x'])
+---+---+-----+----+------------------+
| t| x|count|rank| pct_rank_win|
+---+---+-----+----+------------------+
| 0| 1| 1| 0| 0.0|
| 1| 3| 2| 1| 1.0|
| 2| 3| 2| 1| 1.0|
| 3| 5| 3| 2| 1.0|
| 4| 4| 4| 2|0.6666666666666666|
| 5| 2| 5| 1| 0.25|
+---+---+-----+----+------------------+
【讨论】:
以上是关于如何在 Spark 中计算过去时间值的“percent_rank”?的主要内容,如果未能解决你的问题,请参考以下文章
在 Spark DataFrame 中计算大于 0 的值的更快方法?