在 Spark 中使用 PCA 进行异常检测

Posted

技术标签:

【中文标题】在 Spark 中使用 PCA 进行异常检测【英文标题】:Anomaly detection with PCA in Spark 【发布时间】:2018-09-06 21:26:38 【问题描述】:

我阅读了以下文章

Anomaly detection with Principal Component Analysis (PCA)

在文章中是这样写的:

• PCA 算法基本上将数据读数从现有坐标系转换为新坐标系。

• 数据读数越接近新坐标系的中心,这些读数就越接近最佳值。

• 异常分数是使用读数与所有读数的平均值之间的马氏距离计算的,该平均值是变换坐标系的中心。

谁能更详细地描述一下使用 PCA 进行异常检测(使用 PCA 分数和 Mahalanobis 距离)?我很困惑,因为 PCA 的定义是:PCA 是一种统计过程,它使用正交变换将一组可能相关变量的观察值转换为一组线性不相关变量的值。当变量之间没有更多相关性时如何使用马氏距离?

谁能解释我如何在 Spark 中做到这一点? pca.transform 函数是否返回我应该计算每次读数到中心的马氏距离的分数?

【问题讨论】:

请提供语言信息。斯卡拉好吗? 如果可能的话,我想要Java 语言。否则,它可以是 scala。 【参考方案1】:

假设您有一个 3 维点的数据集。 每个点都有坐标(x, y, z)。 那些(x, y, z) 是维度。 由三个值表示的点 e。 G。 (8, 7, 4)。它称为输入向量。

当您应用 PCA 算法时,您基本上将输入向量转换为新向量。它可以表示为转为(x, y, z) => (v, w).的函数

示例:(8, 7, 4) => (-4, 13)

现在您收到了一个更短的向量(您减少了一个维度),但您的点仍然有坐标,即(v, w)。这意味着您可以使用马氏测量来计算两点之间的距离。距离平均坐标较远的点实际上是异常点。

示例解决方案:

import breeze.linalg.DenseVector, inv
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.PCA, StandardScaler, VectorAssembler
import org.apache.spark.ml.linalg.Matrix, Vector
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.DataFrame, Row, SparkSession
import org.apache.spark.sql.functions._

object SparkApp extends App 
  val session = SparkSession.builder()
    .appName("spark-app").master("local[*]").getOrCreate()
  session.sparkContext.setLogLevel("ERROR")
  import session.implicits._

  val df = Seq(
    (1, 4, 0),
    (3, 4, 0),
    (1, 3, 0),
    (3, 3, 0),
    (67, 37, 0) //outlier
  ).toDF("x", "y", "z")
  val vectorAssembler = new VectorAssembler().setInputCols(Array("x", "y", "z")).setOutputCol("vector")
  val standardScalar = new StandardScaler().setInputCol("vector").setOutputCol("normalized-vector").setWithMean(true)
    .setWithStd(true)

  val pca = new PCA().setInputCol("normalized-vector").setOutputCol("pca-features").setK(2)

  val pipeline = new Pipeline().setStages(
    Array(vectorAssembler, standardScalar, pca)
  )

  val pcaDF = pipeline.fit(df).transform(df)

  def withMahalanobois(df: DataFrame, inputCol: String): DataFrame = 
    val Row(coeff1: Matrix) = Correlation.corr(df, inputCol).head

    val invCovariance = inv(new breeze.linalg.DenseMatrix(2, 2, coeff1.toArray))

    val mahalanobois = udf[Double, Vector]  v =>
      val vB = DenseVector(v.toArray)
      vB.t * invCovariance * vB
    

    df.withColumn("mahalanobois", mahalanobois(df(inputCol)))
  

  val withMahalanobois: DataFrame = withMahalanobois(pcaDF, "pca-features")

  session.close()

【讨论】:

假设我有一个 10x3 数据集(10 个 3 维输入向量)。在 pca 之后(如果我使用 2 个主成分),我应该成为 10x2 数据集。这个 10x2 数据集是我的“我的新数据集”,有 2 个维度。现在我想探索第一个输入向量(10x2 的第一行)是否是异常值。我现在应该计算这个向量(1x2)到 10x2 数据集中心的马氏距离吗? 马氏距离使用协方差矩阵和平均值。现在我应该计算每列的平均值(在本例中为两列),以及数据集 10x2 的协方差矩阵。而不是使用马氏方程,我应该计算输入向量 (1x2) 到中心的距离。 但是当我计算 10x2 数据集的协方差矩阵时,我变成了矩阵 1.0 的主对角线,其余的都是非常小的数字,大约为 0。在我看来,这是意料之中的,因为PCA 将数据集转换为不相关的变量。为什么马氏距离比? 这相当于计算欧几里得距离,因为协方差矩阵是恒等矩阵。我错了吗? 第一条评论:是的,没错。我认为马氏距离在这里得到了很好的解释:jennessent.com/arcview/mahalanobis_description.htm

以上是关于在 Spark 中使用 PCA 进行异常检测的主要内容,如果未能解决你的问题,请参考以下文章

通过python扩展spark mllib 算法包(e.g.基于spark使用孤立森林进行异常检测)

通过python扩展spark mllib 算法包(e.g.基于spark使用孤立森林进行异常检测)

数据挖掘(异常检测)——线性方法

系统检测到异常流量,请问这个该如何解决

PCA图像数据降维及重构误差分析实战并使用TSNE进行异常数据可视化分析

常用的异常检测算法有哪些?