Pandas 与 MLLib 的协方差计算的确切 Apache-Spark NA 处理差异是啥?

Posted

技术标签:

【中文标题】Pandas 与 MLLib 的协方差计算的确切 Apache-Spark NA 处理差异是啥?【英文标题】:What is the Exact Apache-Spark NA Treatment Difference Pandas vs MLLib for Covariance Computation?Pandas 与 MLLib 的协方差计算的确切 Apache-Spark NA 处理差异是什么? 【发布时间】:2021-12-03 13:46:12 【问题描述】:

我最近观察到covariance computation in Pandas 和MLLib equivalent 之间的结果存在显着差异。对于完全指定的输入(即没有任何 NA),结果相当接近,但对于缺失值则显着偏离。 Pandas source explains how NAs are treated 但我无法使用 Spark 重现结果。我在source 中找不到关于RowMatrix().computeCovariance() 对NA 的确切作用的文档 - 但我的Scala 充其量是非常公平的,我不熟悉BLAS,也许我错过了一些东西。由于我使用的是预构建的 macOS Spark 设置,因此我无法找到 BLAS 警告的原因:

WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS

鉴于协方差对许多应用程序的重要性,我想知道是否有人可以阐明在 Apache Spark MLLib 中对协方差计算的缺失值的确切处理方式?

编辑: 此外,这在current Spark 3.2 release 中没有解决,因为The method `pd.DataFrame.cov()` is not implemented yet

假设如下设置:

from pyspark.sql import SparkSession
from pyspark.mllib.linalg.distributed import RowMatrix

spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()
sc = spark.sparkContext
good_rows = sc.parallelize([[11, 12, 13, 14, 16, 17, 18], 
                            [21, 22, 23, 42, 26, 27, 28],
                            [31, 32, 33, 34, 36, 37, 38],
                            [41, 42, 43, 44, 46, 47, 48],
                            [51, 52, 53, 54, 56, 57, 58],
                            [ 1,  2,  3,  4,  6,  7,  8]])
bad_rows = sc.parallelize([[11, 12, None, 14, 16, None, 18], 
                           [21, 22, None, 42, 26, None, 28],
                           [31, 32, None, 34, 36, None, 38],
                           [41, 42, 43, 44, 46, 47, 48],
                           [51, 52, 53, 54, 56, 57, 58],
                           [ 1,  2,  3,  4,  6,  7,  8]])

good_rows 计算的协方差对于 Pandas 和 Spark 是相等的:

good_rows.toDF().toPandas().cov()
# Results in:
       _1     _2     _3     _4     _5     _6     _7
_1  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_2  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_3  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_4  332.0  332.0  332.0  368.0  332.0  332.0  332.0
_5  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_6  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_7  350.0  350.0  350.0  332.0  350.0  350.0  350.0

spark.createDataFrame(RowMatrix(good_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
      _1     _2     _3     _4     _5     _6     _7
0  350.0  350.0  350.0  332.0  350.0  350.0  350.0
1  350.0  350.0  350.0  332.0  350.0  350.0  350.0
2  350.0  350.0  350.0  332.0  350.0  350.0  350.0
3  332.0  332.0  332.0  368.0  332.0  332.0  332.0
4  350.0  350.0  350.0  332.0  350.0  350.0  350.0
5  350.0  350.0  350.0  332.0  350.0  350.0  350.0
6  350.0  350.0  350.0  332.0  350.0  350.0  350.0

在非常不同的协方差矩阵中使用 bad_rowsresults 运行相同,除非 Pandas 是 cov() 运行 min_periods=(bad_rows.count()/2)+1

bad_rows.toDF().toPandas().cov()
#Results in: 
       _1     _2     _3     _4     _5     _6     _7
_1  350.0  350.0  700.0  332.0  350.0  700.0  350.0
_2  350.0  350.0  700.0  332.0  350.0  700.0  350.0
_3  700.0  700.0  700.0  700.0  700.0  700.0  700.0
_4  332.0  332.0  700.0  368.0  332.0  700.0  332.0
_5  350.0  350.0  700.0  332.0  350.0  700.0  350.0
_6  700.0  700.0  700.0  700.0  700.0  700.0  700.0
_7  350.0  350.0  700.0  332.0  350.0  700.0  350.0
spark.createDataFrame(RowMatrix(bad_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
      _1     _2  _3     _4     _5  _6     _7
0  350.0  350.0 NaN  332.0  350.0 NaN  350.0
1  350.0  350.0 NaN  332.0  350.0 NaN  350.0
2    NaN    NaN NaN    NaN    NaN NaN    NaN
3  332.0  332.0 NaN  368.0  332.0 NaN  332.0
4  350.0  350.0 NaN  332.0  350.0 NaN  350.0
5    NaN    NaN NaN    NaN    NaN NaN    NaN
6  350.0  350.0 NaN  332.0  350.0 NaN  350.0

bad_rows.toDF().toPandas().cov(min_periods=(bad_rows.count()/2)+1)
# With 50% of dataframe rows +1 Pandas equals the Spark result:
       _1     _2  _3     _4     _5  _6     _7
_1  350.0  350.0 NaN  332.0  350.0 NaN  350.0
_2  350.0  350.0 NaN  332.0  350.0 NaN  350.0
_3    NaN    NaN NaN    NaN    NaN NaN    NaN
_4  332.0  332.0 NaN  368.0  332.0 NaN  332.0
_5  350.0  350.0 NaN  332.0  350.0 NaN  350.0
_6    NaN    NaN NaN    NaN    NaN NaN    NaN
_7  350.0  350.0 NaN  332.0  350.0 NaN  350.0

我确实尝试将 None 设置为 0mean,但无法使用这些标准插补重现 MLLib 协方差结果,请参见下文。

# Zero NA fill:
zeroed_na_rows = sc.parallelize([[11, 12, 0, 14, 16, 0, 18], 
                       [21, 22, 0, 42, 26, 0, 28],
                       [31, 32, 0, 34, 36, 0, 38],
                       [41, 42, 43, 44, 46, 47, 48],
                       [51, 52, 53, 54, 56, 57, 58],
                       [1, 2, 3, 4, 6, 7, 8]])
spark.createDataFrame(RowMatrix(zeroed_na_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
      _1     _2     _3     _4     _5     _6     _7
0  350.0  350.0  379.0  332.0  350.0  391.0  350.0
1  350.0  350.0  379.0  332.0  350.0  391.0  350.0
2  379.0  379.0  606.7  319.6  379.0  646.3  379.0
3  332.0  332.0  319.6  368.0  332.0  324.4  332.0
4  350.0  350.0  379.0  332.0  350.0  391.0  350.0
5  391.0  391.0  646.3  324.4  391.0  690.7  391.0
6  350.0  350.0  379.0  332.0  350.0  391.0  350.0

# Mean NA fill:
mean_rows = sc.parallelize([[11, 12, 27, 14, 16, 37, 18], 
                           [21, 22, 27, 42, 26, 37, 28],
                           [31, 32, 27, 34, 36, 37, 38],
                           [41, 42, 43, 44, 46, 47, 48],
                           [51, 52, 53, 54, 56, 57, 58],
                           [ 1,  2,  3,  4,  6,  7,  8]])
spark.createDataFrame(RowMatrix(mean_rows).computeCovariance().toArray().tolist()).toPandas()
#Results in (still different from Pandas.cov()):
      _1     _2     _3     _4     _5     _6     _7
0  350.0  350.0  298.0  332.0  350.0  280.0  350.0
1  350.0  350.0  298.0  332.0  350.0  280.0  350.0
2  298.0  298.0  290.8  287.2  298.0  280.0  298.0
3  332.0  332.0  287.2  368.0  332.0  280.0  332.0
4  350.0  350.0  298.0  332.0  350.0  280.0  350.0
5  280.0  280.0  280.0  280.0  280.0  280.0  280.0
6  350.0  350.0  298.0  332.0  350.0  280.0  350.0

如果不是这样,这里发生了什么?如何让 Spark MLLib 产生与 Pandas 相当相似的结果?

【问题讨论】:

【参考方案1】:

我认为没有一种简单的方法可以在不重新实现自己的 cov 方法的情况下在 Spark 中重现 Pandas 对 NAN 的处理。

原因是 Pandas 只是忽略每个 NAN - 它不会用任何值替换它 - 这就是为什么你用 0 替换 NAN 或平均值不会导致相同的结果。相反,Pandas 似乎会丢弃具有缺失值的一对观测值,并计算剩余对的协方差。

另一方面,Spark 实现在被要求计算包含 NAN 的一组对的协方差时返回 NAN。我不知道这在代码/计算中究竟发生在什么时候,但据我所知,您无法通过更改默认参数轻松更改它,您可能必须创建自己的 cov 函数版本或者找到一种使用 NAN 对列进行预处理和后处理的方法,例如删除它们的 NAN 并计算协方差,然后将所得协方差矩阵中的 NAN 替换为这些值。

【讨论】:

谢谢。您能否详细说明是什么让您认为这只是由于取消了 NA?删除 NA 只会产生另一个 COV 矩阵。 要得到 Pandas 给您的结果,请尝试手动计算协方差矩阵,并为每对列删除至少包含一个 NaN 的值对。 不确定“又一个 COV 矩阵”是什么意思?其他价值观?其他形状?通过计算输入矩阵中每对两列的 cov 来计算 cov 矩阵。生成的 cov 矩阵的维度为n x n,其中n 是输入矩阵的列数。当任何一列中有 NaN 时,它将与另一列中的伙伴一起被忽略。但除非值太少,否则仍将计算这两列的协方差并将填充 cov 矩阵 - 生成的 cov 矩阵将具有与一个不带 NaN 的相同形状。 如果你放弃 NA,SparkML 仍然无法计算出与 Pandas 相当的结果。也许您可以展示一些使它们符合要求的代码? 您是如何在 Spark 中删除 NA 的?您是否删除了包含 NA 的整行或整列?抱歉,无法显示任何代码以使它们符合要求,但如果您想了解 Pandas 如何“忽略” NA,请查看 [nancorr][1] 中的代码。它在输入矩阵上创建一个掩码,将每个 Nan 替换为 0,然后迭代矩阵以计算 cov,忽略具有 0 的值。您可以尝试在 Spark 中实现这种方法吗? [1]:github.com/pandas-dev/pandas/blob/…

以上是关于Pandas 与 MLLib 的协方差计算的确切 Apache-Spark NA 处理差异是啥?的主要内容,如果未能解决你的问题,请参考以下文章

Pandas计算标准差

pandas数据计算和统计

pandas:根据另一列中的值获取与相应索引的确切对应值

Python:使用pandas和numpy计算标准差的区别

pandas基于时序数据计算模型预测推理需要的统计数据(累计时间长度变化变化率方差均值最大最小等):数据持续的时间(分钟)获得某一节点之后的数据总变化量获得范围内的统计量

Spark-Mllib基本统计