如何在 Spark 中计算过去时间值的“percent_rank”?

Posted

技术标签:

【中文标题】如何在 Spark 中计算过去时间值的“percent_rank”?【英文标题】:How to calculate `percent_rank` on past time values in Spark? 【发布时间】:2017-08-23 08:41:43 【问题描述】:

我想计算列 xpercent_rank,但排名应该仅与 过去 值相关 - 由时间戳变量 t 确定。

似乎F.percent_rank() 不接受任何参数并且要获得独立于时间的排名,您将使用percent_rank().over(Window.orderBy("x"))?!

有没有办法根据时间戳较小的值的数量来获得排名?

预期的结果是这样的

t     x     perc_rank_win
0     1     0.0
1     3     1.0           # since 3 is largest from [1, 3]
2     5     1.0           # since 5 is largest from [1, 3, 5]
3     4     0.66          # since values are [1, 3, 4!, 5]
4     2     0.25          # since [1, 2!, 3, 4, 5]

【问题讨论】:

【参考方案1】:

这是我尝试在窗口分区上使用 collect_set 的另一个解决方法,

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(0,1),(1,3),(2,5),(3,4),(4,2)],['t','x'])
df.show()
+---+---+
|  t|  x|
+---+---+
|  0|  1|
|  1|  3|
|  2|  5|
|  3|  4|
|  4|  2|
+---+---+
w = Window.orderBy('t')
df = df.withColumn('somecol',F.collect_set('x').over(w))
df.show()
+---+---+---------------+
|  t|  x|        somecol|
+---+---+---------------+
|  0|  1|            [1]|
|  1|  3|         [1, 3]|
|  2|  5|      [1, 5, 3]|
|  3|  4|   [1, 5, 3, 4]|
|  4|  2|[1, 5, 2, 3, 4]|
+---+---+---------------+
def pct_rank(s,v):
    x=sorted(s)
    if len(x) == 1:
        return float(0)
    else:
        pc = float(1)/(len(x)-1)
        idx = x.index(v)
        return float("0:.2f".format(idx*pc))

pct_udf = F.udf(pct_rank)
df.select("t","x",pct_udf(df['somecol'],df['x']).alias('pct_rank')).show()
+---+---+--------+
|  t|  x|pct_rank|
+---+---+--------+
|  0|  1|     0.0|
|  1|  3|     1.0|
|  2|  5|     1.0|
|  3|  4|    0.67|
|  4|  2|    0.25|
+---+---+--------+

【讨论】:

【参考方案2】:

为了获得滚动的percent_rank(),您必须能够使用窗口框架定义来对您根本不能的功能进行排名。 (类似这样的w = Window.orderBy('t', 'x').rowsBetween(-sys.maxsize, 0)

我找到了一种解决方法,但它涉及一个非常昂贵的笛卡尔连接:

首先让我们创建示例数据框:

import pyspark.sql.functions as psf
from pyspark.sql import HiveContext
hc = HiveContext(sc)
df = hc.createDataFrame(sc.parallelize(zip(range(5), [1,3,5,4,2])), ['t', 'x'])

笛卡尔连接:

df2 = df.groupBy(df.x.alias('x2')).agg(psf.min("t").alias("t2"))
df_cross = df.join(df2).filter("t2 <= t").withColumn("isSup", (df.x > df2.x2).cast("int"))

    +---+---+---+---+-----+
    |  t|  x| t2| x2|isSup|
    +---+---+---+---+-----+
    |  1|  3|  0|  1|    1|
    |  2|  5|  0|  1|    1|
    |  2|  5|  1|  3|    1|
    |  3|  4|  0|  1|    1|
    |  3|  4|  1|  3|    1|
    |  3|  4|  2|  5|    0|
    |  4|  2|  0|  1|    1|
    |  4|  2|  1|  3|    0|
    |  4|  2|  2|  5|    0|
    |  4|  2|  3|  4|    0|
    +---+---+---+---+-----+

最后我们按't'、'x'分组:

df_fin = df_cross.groupBy("t", "x").agg(
    psf.count("*").alias("count"), 
    psf.sum("isSup").alias("rank")
).withColumn('pct_rank_win', psf.col("rank")/psf.greatest(psf.col('count') - 1, psf.lit(1)))

    +---+---+-----+----+------------------+
    |  t|  x|count|rank|      pct_rank_win|
    +---+---+-----+----+------------------+
    |  0|  1|    1|   0|               0.0|
    |  1|  3|    2|   1|               1.0|
    |  2|  5|    3|   2|               1.0|
    |  3|  4|    4|   2|0.6666666666666666|
    |  4|  2|    5|   1|              0.25|
    +---+---+-----+----+------------------+

df2 定义中的groupBy('x') 是为了确保密集排名(相同的值将具有相同的排名),如下例所示:

df = hc.createDataFrame(sc.parallelize(zip(range(6), [1,3,3,5,4,2])), ['t', 'x'])

    +---+---+-----+----+------------------+
    |  t|  x|count|rank|      pct_rank_win|
    +---+---+-----+----+------------------+
    |  0|  1|    1|   0|               0.0|
    |  1|  3|    2|   1|               1.0|
    |  2|  3|    2|   1|               1.0|
    |  3|  5|    3|   2|               1.0|
    |  4|  4|    4|   2|0.6666666666666666|
    |  5|  2|    5|   1|              0.25|
    +---+---+-----+----+------------------+

【讨论】:

以上是关于如何在 Spark 中计算过去时间值的“percent_rank”?的主要内容,如果未能解决你的问题,请参考以下文章

计算数据帧 Spark 中缺失值的数量

在 Spark DataFrame 中计算大于 0 的值的更快方法?

在 python 或 spark 中获取大数据缺失值的最快方法是啥?

如何在 Spark/Scala 中查找具有许多空值的列

使用 CoreData 关系计算摘要

计算Spark DataFrame中的非空值的数量