Spark基本统计--基于DataFrame的API|附视频

Posted 清华计算机学堂

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark基本统计--基于DataFrame的API|附视频相关的知识,希望对你有一定的参考价值。

本篇内容基于DataFrame的API使用的是spark.ml,并给出三个示例:相关性(Correlation)、假设检验(Hypothesis testing)和累积器(Summarizer)。

01

相关性

计算两个数据系列之间的相关性是统计学中的常见操作。在spark.ml中,我们提供了计算多个系列之间成对相关性的灵活性。支持的相关方法目前是Pearson和Spearman的相关性。

Correlation使用指定的方法计算Vector的输入数据集的相关矩阵。输出将是一个DataFrame,它包含向量列的相关矩阵。

例如1,Scala语言:

# spark-shell
scala> import org.apache.spark.ml.linalg.{Matrix, Vectors}
scala> import org.apache.spark.ml.stat.Correlation
scala> import org.apache.spark.sql.Row
scala> val data = Seq(
     | Vectors.sparse(4, Seq((0, 1.0), (3, -2.0))),
     | Vectors.dense(4.0, 5.0, 0.0, 3.0),
     | Vectors.dense(6.0, 7.0, 0.0, 8.0),
     | Vectors.sparse(4, Seq((0, 9.0), (3, 1.0)))
     | )
scala> val df = data.map(Tuple1.apply).toDF("features")
scala> val Row(coeff1: Matrix) = Correlation.corr(df, "features").head
scala> println(s"Pearson correlation matrix:\n $coeff1")
Pearson correlation matrix:
 1.0 0.055641488407465814 NaN 0.4004714203168137
0.055641488407465814 1.0 NaN 0.9135958615342522
NaN NaN 1.0 NaN
0.4004714203168137 0.9135958615342522 NaN 1.0
scala> val Row(coeff2: Matrix) = Correlation.corr(df, "features", "spearman").head
scala> println(s"Spearman correlation matrix:\n $coeff2")
Spearman correlation matrix:
 1.0 0.10540925533894532 NaN 0.40000000000000174
0.10540925533894532 1.0 NaN 0.9486832980505141
NaN NaN 1.0 NaN
0.40000000000000174 0.9486832980505141 NaN 1.0

Python语言:

# pyspark
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.ml.stat import Correlation
>>> data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
... (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
... (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
... (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
>>> df = spark.createDataFrame(data, ["features"])
>>> r1 = Correlation.corr(df, "features").head()
>>> print("Pearson correlation matrix:\n" + str(r1[0]))
Pearson correlation matrix:
DenseMatrix([[ 1. , 0.05564149, nan, 0.40047142],
             [ 0.05564149, 1. , nan, 0.91359586],
             [ nan, nan, 1. , nan],
             [ 0.40047142, 0.91359586, nan, 1. ]])
>>> r2 = Correlation.corr(df, "features", "spearman").head()
>>> print("Spearman correlation matrix:\n" + str(r2[0]))
Spearman correlation matrix:
DenseMatrix([[ 1. , 0.10540926, nan, 0.4 ],
             [ 0.10540926, 1. , nan, 0.9486833 ],
             [ nan, nan, 1. , nan],
             [ 0.4 , 0.9486833 , nan, 1. ]])

02

假设检验

假设检验是统计学中一种强有力的工具,用于确定结果是否具有统计显着性,无论该结果是否偶然发生。spark.ml目前支持Pearson的Chi-squared(X2)的独立测试。

ChiSquareTest针对标签对每个功能进行Pearson独立测试。对于每个特征,将(特征,标签)对转换为应急矩阵,对其计算卡方统计量。所有标签和特征值必须是分类的。

例如2,Scala语言:

# spark-shell
scala> import org.apache.spark.ml.linalg.{Vector, Vectors}
scala> import org.apache.spark.ml.stat.ChiSquareTest
scala> val data = Seq(
     | (0.0, Vectors.dense(0.5, 10.0)),
     | (0.0, Vectors.dense(1.5, 20.0)),
     | (1.0, Vectors.dense(1.5, 30.0)),
     | (0.0, Vectors.dense(3.5, 30.0)),
     | (0.0, Vectors.dense(3.5, 40.0)),
     | (1.0, Vectors.dense(3.5, 40.0))
     | )
scala> val df = data.toDF("label", "features")
scala> val chi = ChiSquareTest.test(df, "features", "label").head
scala> println(s"pValues = ${chi.getAs[Vector](0)}")
pValues = [0.6872892787909721,0.6822703303362126]
scala> println(s"degreesOfFreedom ${chi.getSeq[Int](1).mkString("[", ",", "]")}")
degreesOfFreedom [2,3]
scala> println(s"statistics ${chi.getAs[Vector](2)}")
statistics [0.75,1.5]

Python语言:


# pyspark
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.ml.stat import ChiSquareTest
>>> data = [(0.0, Vectors.dense(0.5, 10.0)),
...         (0.0, Vectors.dense(1.5, 20.0)),
...         (1.0, Vectors.dense(1.5, 30.0)),
...         (0.0, Vectors.dense(3.5, 30.0)),
...         (0.0, Vectors.dense(3.5, 40.0)),
...         (1.0, Vectors.dense(3.5, 40.0))]
>>> df = spark.createDataFrame(data, ["label", "features"])
>>> r = ChiSquareTest.test(df, "features", "label").head()
>>> print("pValues: " + str(r.pValues))
pValues: [0.687289278791,0.682270330336]
>>> print("degreesOfFreedom: " + str(r.degreesOfFreedom))
degreesOfFreedom: [2, 3]
>>> print("statistics: " + str(r.statistics))
statistics: [0.75,1.5]

03

累积器

我们通过Summarizer为Dataframe提供矢量列摘要统计。可用指标是列的最大值、最小值、平均值、方差和非零数、以及总计数。

例如3,Scala语言:

# spark-shell
scala> import org.apache.spark.ml.linalg.{Vector, Vectors}
scala> import org.apache.spark.ml.stat.Summarizer
scala> val data = Seq(
     | (Vectors.dense(2.0, 3.0, 5.0), 1.0),
     | (Vectors.dense(4.0, 6.0, 7.0), 2.0)
     | )
scala> val df = data.toDF("features", "weight")
scala> val (meanVal, varianceVal) =
 df.select(Summarizer.metrics("mean", "variance")
.summary($"features", $"weight")
.as("summary")).select("summary.mean", "summary.variance")
.as[(Vector, Vector)].first()
scala> println(s"with weight: mean = ${meanVal}, variance = ${varianceVal}")
with weight: mean = [3.333333333333333,5.0,6.333333333333333],
 variance = [2.0,4.5,2.0]
scala> val (meanVal2, varianceVal2) = df.select(Summarizer.mean($"features"),
 Summarizer.variance($"features"))
.as[(Vector, Vector)].first()
scala> println(s"without weight: mean = ${meanVal2}, sum = ${varianceVal2}")
without weight: mean = [3.0,4.5,6.0], sum = [2.0,4.5,2.0]

Python语言:

# pyspark
>>> from pyspark.ml.stat import Summarizer
>>> from pyspark.sql import Row
>>> from pyspark.ml.linalg import Vectors
>>> df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
... Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()
>>> summarizer = Summarizer.metrics("mean", "count")
>>> df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)
+-----------------------------------+
|aggregate_metrics(features, weight)|
+-----------------------------------+
|[[1.0,1.0,1.0], 1] |
+-----------------------------------+
>>> df.select(summarizer.summary(df.features)).show(truncate=False)
+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|[[1.0,1.5,2.0], 2] |
+--------------------------------+
>>> df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)
+--------------+
|mean(features)|
+--------------+
|[1.0,1.0,1.0] |
+--------------+
>>> df.select(Summarizer.mean(df.features)).show(truncate=False)
+--------------+
|mean(features)|
+--------------+
|[1.0,1.5,2.0] |
+--------------+

04

视频讲解

讲解视频如下:

Spark基本统计--基于DataFrame的API|附视频

05

参考书籍

大数据技术与应用-微课视频版

ISBN:978-7-302-53843-1

肖政宏 李俊杰 谢志明 编著

定价:49.80元

Spark基本统计--基于DataFrame的API|附视频