在 pyspark 中聚合 Kolmogorov Smirnov 测试

Posted

技术标签:

【中文标题】在 pyspark 中聚合 Kolmogorov Smirnov 测试【英文标题】:Aggregating Kolmogrov Smirnov Test in pyspark 【发布时间】:2020-09-12 06:44:19 【问题描述】:

有没有办法使用 groupBy 子句或某种聚合方法从 pyspark 中的 spark.mllib 库中应用 KS 测试? 例如,我有一个数据框df,其中包含IDRESULT 列,如下所示:

+-------+------+
|     ID|RESULT|
+-------+------+
|3648296|  2.73|
|3648296|  9.64|
|3648189|  0.03|
|3648189|  0.03|
|3648296|  2.51|
|3648189|  0.01|
|3648296|  1.75|
|3648296| 30.23|
|3648189|  0.02|
|3648189|  0.02|
|3648189|  0.02|
|3648296|  3.28|
|3648296| 32.55|
|3648296|  2.32|
|3648296| 34.58|
|3648296| 29.22|
|3648189|  0.02|
|3648296|  1.36|
|3648296|  1.64|
|3648296|  1.17|
+-------+------+

有 2 个IDs 36482963648189,它们对应的每个 RESULT 值都在几十万左右。 是否可以像这样应用 groupBy 函数:

from pyspark.mllib.stat import Statistics

normtest=df.groupBy('ID').Statistics.kolmogorovSmirnovTest(df.RESULT, "norm", 0, 1)

这样我得到一个输出数据框,如:

+-------+---------+----------+
|     ID|p-value  |statistic |
+-------+---------+----------+
|3648296|some val | some val |
|3648189|some val | some val |
+-------+---------+----------+

这可能吗?

【问题讨论】:

您能找到解决方案吗? 不,我没有,我不得不大幅减少我的数据集并使用熊猫,这违背了目的。您对此有解决方案吗? 我实际上设法使用分箱为此设计了一个解决方案。我会在这里发布。 【参考方案1】:

这可以通过对数据进行分箱来解决,然后对分箱数据(即直方图)执行Kolmogorov-Smirnov Test。 它不会产生最大距离,但如果您的有效分布是平滑的,那么结果应该足够接近。

通过对结果进行分桶,您可以确保一次仅将有限数量的项目(桶的数量)加载到内存中。

首先,我们需要实现 kstest 的直方图版本:

import numpy as np

def hist_kstest(hist: np.array, bin_edges: np.array, cdf):
    i = hist.cumsum()
    n = i[-1]

    bin_right_edges = bin_edges[1:]
    cdf_vals = cdf(bin_right_edges)
    
    statistic = np.max([
        cdf_vals - (i-1) / n,
        i / n - cdf_vals
    ])
    pvalue = stats.distributions.kstwo.sf(statistic, n)
    return statistic, pvalue

然后按如下方式使用:

from pyspark.sql import functions as F, types as T
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np
from scipy import stats

# Choose the number of buckets. It depends on your memory
# availability and affects the accuracy of the test.
num_buckets = 1_000

# Choose the null hypothesis (H0)
h0_cdf = stats.norm(0, 1).cdf

# Bucket the result and get the buckets' edges
bucketizer = QuantileDiscretizer(
    numBuckets=num_buckets, inputCol='RESULT', outputCol='result_bucket'
).setHandleInvalid("keep").fit(df)
buckets = np.array(bucketizer.getSplits())

def kstest(key, pdf: pd.DataFrame):
    pdf.sort_values('result_bucket', inplace=True)
    hist = pdf['count'].to_numpy()
    # Some of the buckets might not appear in all the groups, so
    # we filter buckets that are not available.
    bin_edges = buckets[[0, *(pdf['result_bucket'].to_numpy() + 1)]]
    statistic, pvalue = hist_kstest(hist, bin_edges, h0_cdf)
    return pd.DataFrame([[*key, statistic, pvalue]])

df = bucketizer.transform(df).groupBy("ID", "result_bucket").agg(
    F.count("*").alias("count")
).groupby("ID").applyInPandas(kstest, "ID long, statistic double, pvalue double")

【讨论】:

感谢分享您的解决方案!挺有趣的!因此,您将数据分箱到桶中,然后将每个分箱数据集转换为 pandas 数据帧,并从头开始应用 KS 测试。通常,当您进行 pandas 转换时,它会将该数据集转储到驱动程序节点中。我想知道这种方法是否会在分布式节点本身中执行计算。也许如果每个节点的内存足够大? 通过对结果进行分桶,您可以确保一次仅将有限数量的项目(桶的数量)加载到内存中。 是的,我认为这是要走的路。我将对其进行测试并将其设置为答案。再次感谢!

以上是关于在 pyspark 中聚合 Kolmogorov Smirnov 测试的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?

在 pyspark 中聚合 5 分钟窗口

在 pyspark 中应用用户定义的聚合函数的替代方法

在 groupby 操作 PySpark 中聚合列中的稀疏向量

在 PySpark Dataframe 中结合旋转和分组聚合

如何在 PySpark SQL when() 子句中使用聚合值?