Pandas 与 MLLib 的协方差计算的确切 Apache-Spark NA 处理差异是啥?
Posted
技术标签:
【中文标题】Pandas 与 MLLib 的协方差计算的确切 Apache-Spark NA 处理差异是啥?【英文标题】:What is the Exact Apache-Spark NA Treatment Difference Pandas vs MLLib for Covariance Computation?Pandas 与 MLLib 的协方差计算的确切 Apache-Spark NA 处理差异是什么? 【发布时间】:2021-12-03 13:46:12 【问题描述】:我最近观察到covariance computation in Pandas 和MLLib equivalent 之间的结果存在显着差异。对于完全指定的输入(即没有任何 NA),结果相当接近,但对于缺失值则显着偏离。 Pandas source explains how NAs are treated 但我无法使用 Spark 重现结果。我在source 中找不到关于RowMatrix().computeCovariance()
对NA 的确切作用的文档 - 但我的Scala 充其量是非常公平的,我不熟悉BLAS,也许我错过了一些东西。由于我使用的是预构建的 macOS Spark 设置,因此我无法找到 BLAS 警告的原因:
WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
鉴于协方差对许多应用程序的重要性,我想知道是否有人可以阐明在 Apache Spark MLLib 中对协方差计算的缺失值的确切处理方式?
编辑:
此外,这在current Spark 3.2 release 中没有解决,因为The method `pd.DataFrame.cov()` is not implemented yet
。
假设如下设置:
from pyspark.sql import SparkSession
from pyspark.mllib.linalg.distributed import RowMatrix
spark = SparkSession.builder.appName("MyApp") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
sc = spark.sparkContext
good_rows = sc.parallelize([[11, 12, 13, 14, 16, 17, 18],
[21, 22, 23, 42, 26, 27, 28],
[31, 32, 33, 34, 36, 37, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1, 2, 3, 4, 6, 7, 8]])
bad_rows = sc.parallelize([[11, 12, None, 14, 16, None, 18],
[21, 22, None, 42, 26, None, 28],
[31, 32, None, 34, 36, None, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1, 2, 3, 4, 6, 7, 8]])
从good_rows
计算的协方差对于 Pandas 和 Spark 是相等的:
good_rows.toDF().toPandas().cov()
# Results in:
_1 _2 _3 _4 _5 _6 _7
_1 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_2 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_3 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_4 332.0 332.0 332.0 368.0 332.0 332.0 332.0
_5 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_6 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_7 350.0 350.0 350.0 332.0 350.0 350.0 350.0
spark.createDataFrame(RowMatrix(good_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 350.0 332.0 350.0 350.0 350.0
1 350.0 350.0 350.0 332.0 350.0 350.0 350.0
2 350.0 350.0 350.0 332.0 350.0 350.0 350.0
3 332.0 332.0 332.0 368.0 332.0 332.0 332.0
4 350.0 350.0 350.0 332.0 350.0 350.0 350.0
5 350.0 350.0 350.0 332.0 350.0 350.0 350.0
6 350.0 350.0 350.0 332.0 350.0 350.0 350.0
在非常不同的协方差矩阵中使用 bad_rows
results 运行相同,除非 Pandas 是 cov()
运行 min_periods=(bad_rows.count()/2)+1
bad_rows.toDF().toPandas().cov()
#Results in:
_1 _2 _3 _4 _5 _6 _7
_1 350.0 350.0 700.0 332.0 350.0 700.0 350.0
_2 350.0 350.0 700.0 332.0 350.0 700.0 350.0
_3 700.0 700.0 700.0 700.0 700.0 700.0 700.0
_4 332.0 332.0 700.0 368.0 332.0 700.0 332.0
_5 350.0 350.0 700.0 332.0 350.0 700.0 350.0
_6 700.0 700.0 700.0 700.0 700.0 700.0 700.0
_7 350.0 350.0 700.0 332.0 350.0 700.0 350.0
spark.createDataFrame(RowMatrix(bad_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 NaN 332.0 350.0 NaN 350.0
1 350.0 350.0 NaN 332.0 350.0 NaN 350.0
2 NaN NaN NaN NaN NaN NaN NaN
3 332.0 332.0 NaN 368.0 332.0 NaN 332.0
4 350.0 350.0 NaN 332.0 350.0 NaN 350.0
5 NaN NaN NaN NaN NaN NaN NaN
6 350.0 350.0 NaN 332.0 350.0 NaN 350.0
bad_rows.toDF().toPandas().cov(min_periods=(bad_rows.count()/2)+1)
# With 50% of dataframe rows +1 Pandas equals the Spark result:
_1 _2 _3 _4 _5 _6 _7
_1 350.0 350.0 NaN 332.0 350.0 NaN 350.0
_2 350.0 350.0 NaN 332.0 350.0 NaN 350.0
_3 NaN NaN NaN NaN NaN NaN NaN
_4 332.0 332.0 NaN 368.0 332.0 NaN 332.0
_5 350.0 350.0 NaN 332.0 350.0 NaN 350.0
_6 NaN NaN NaN NaN NaN NaN NaN
_7 350.0 350.0 NaN 332.0 350.0 NaN 350.0
我确实尝试将 None
设置为 0
和 mean
,但无法使用这些标准插补重现 MLLib 协方差结果,请参见下文。
# Zero NA fill:
zeroed_na_rows = sc.parallelize([[11, 12, 0, 14, 16, 0, 18],
[21, 22, 0, 42, 26, 0, 28],
[31, 32, 0, 34, 36, 0, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[1, 2, 3, 4, 6, 7, 8]])
spark.createDataFrame(RowMatrix(zeroed_na_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 379.0 332.0 350.0 391.0 350.0
1 350.0 350.0 379.0 332.0 350.0 391.0 350.0
2 379.0 379.0 606.7 319.6 379.0 646.3 379.0
3 332.0 332.0 319.6 368.0 332.0 324.4 332.0
4 350.0 350.0 379.0 332.0 350.0 391.0 350.0
5 391.0 391.0 646.3 324.4 391.0 690.7 391.0
6 350.0 350.0 379.0 332.0 350.0 391.0 350.0
# Mean NA fill:
mean_rows = sc.parallelize([[11, 12, 27, 14, 16, 37, 18],
[21, 22, 27, 42, 26, 37, 28],
[31, 32, 27, 34, 36, 37, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1, 2, 3, 4, 6, 7, 8]])
spark.createDataFrame(RowMatrix(mean_rows).computeCovariance().toArray().tolist()).toPandas()
#Results in (still different from Pandas.cov()):
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 298.0 332.0 350.0 280.0 350.0
1 350.0 350.0 298.0 332.0 350.0 280.0 350.0
2 298.0 298.0 290.8 287.2 298.0 280.0 298.0
3 332.0 332.0 287.2 368.0 332.0 280.0 332.0
4 350.0 350.0 298.0 332.0 350.0 280.0 350.0
5 280.0 280.0 280.0 280.0 280.0 280.0 280.0
6 350.0 350.0 298.0 332.0 350.0 280.0 350.0
如果不是这样,这里发生了什么?如何让 Spark MLLib 产生与 Pandas 相当相似的结果?
【问题讨论】:
【参考方案1】:我认为没有一种简单的方法可以在不重新实现自己的 cov 方法的情况下在 Spark 中重现 Pandas 对 NAN 的处理。
原因是 Pandas 只是忽略每个 NAN - 它不会用任何值替换它 - 这就是为什么你用 0 替换 NAN 或平均值不会导致相同的结果。相反,Pandas 似乎会丢弃具有缺失值的一对观测值,并计算剩余对的协方差。
另一方面,Spark 实现在被要求计算包含 NAN 的一组对的协方差时返回 NAN。我不知道这在代码/计算中究竟发生在什么时候,但据我所知,您无法通过更改默认参数轻松更改它,您可能必须创建自己的 cov 函数版本或者找到一种使用 NAN 对列进行预处理和后处理的方法,例如删除它们的 NAN 并计算协方差,然后将所得协方差矩阵中的 NAN 替换为这些值。
【讨论】:
谢谢。您能否详细说明是什么让您认为这只是由于取消了 NA?删除 NA 只会产生另一个 COV 矩阵。 要得到 Pandas 给您的结果,请尝试手动计算协方差矩阵,并为每对列删除至少包含一个 NaN 的值对。 不确定“又一个 COV 矩阵”是什么意思?其他价值观?其他形状?通过计算输入矩阵中每对两列的 cov 来计算 cov 矩阵。生成的 cov 矩阵的维度为n x n
,其中n
是输入矩阵的列数。当任何一列中有 NaN 时,它将与另一列中的伙伴一起被忽略。但除非值太少,否则仍将计算这两列的协方差并将填充 cov 矩阵 - 生成的 cov 矩阵将具有与一个不带 NaN 的相同形状。
如果你放弃 NA,SparkML 仍然无法计算出与 Pandas 相当的结果。也许您可以展示一些使它们符合要求的代码?
您是如何在 Spark 中删除 NA 的?您是否删除了包含 NA 的整行或整列?抱歉,无法显示任何代码以使它们符合要求,但如果您想了解 Pandas 如何“忽略” NA,请查看 [nancorr][1] 中的代码。它在输入矩阵上创建一个掩码,将每个 Nan 替换为 0,然后迭代矩阵以计算 cov,忽略具有 0 的值。您可以尝试在 Spark 中实现这种方法吗? [1]:github.com/pandas-dev/pandas/blob/…以上是关于Pandas 与 MLLib 的协方差计算的确切 Apache-Spark NA 处理差异是啥?的主要内容,如果未能解决你的问题,请参考以下文章
pandas基于时序数据计算模型预测推理需要的统计数据(累计时间长度变化变化率方差均值最大最小等):数据持续的时间(分钟)获得某一节点之后的数据总变化量获得范围内的统计量