Spark分布式机器学习源码分析:模型评估指标
Posted 雨云飞
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark分布式机器学习源码分析:模型评估指标相关的知识,希望对你有一定的参考价值。
Spark是一个极为优秀的大数据框架,在大数据批处理上基本无人能敌,流处理上也有一席之地,机器学习则是当前正火热AI人工智能的驱动引擎,在大数据场景下如何发挥AI技术成为优秀的大数据挖掘工程师必备技能。本文结合机器学习思想与Spark框架代码结构来实现分布式机器学习过程,希望与大家一起学习进步~
本文采用的组件版本为:Ubuntu 19.10、Jdk 1.8.0_241、Scala 2.11.12、Hadoop 3.2.1、Spark 2.4.5,老规矩先开启一系列Hadoop、Spark服务与Spark-shell窗口:
1
二分类
-
真阳性(TP)-标签为阳性,预测也为阳性 -
真阴性(TN)-标签为负,预测也为负 -
假阳性(FP)-标签为负,但预测为正 -
假阴性(FN)-标签为阳性,但预测为阴性
2
二分类
二分类器用于将给定数据集的元素分为两个可能的组(例如欺诈或非欺诈)之一,这是多类分类的一种特殊情况。大多数二元分类指标可以概括为多类分类指标。
3
多分类
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// 加载libsvm格式的训练数据
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_binary_classification_data.txt")
// 将数据分为训练(60%)和测试(40%)
val Array(training, test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
training.cache()
// 运行训练算法以建立模型
val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(training)
// 清除预测阈值,以便模型返回概率
model.clearThreshold
// 计算测试集上的原始分数
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
// 实例化指标对象
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
// 通过阈值预测精度
val precision = metrics.precisionByThreshold
precision.foreach { case (t, p) =>
println(s"Threshold: $t, Precision: $p")
}
// 通过阈值预测召回率
val recall = metrics.recallByThreshold
recall.foreach { case (t, r) =>
println(s"Threshold: $t, Recall: $r")
}
// PR曲线
val PRC = metrics.pr
// F值
val f1Score = metrics.fMeasureByThreshold
f1Score.foreach { case (t, f) =>
println(s"Threshold: $t, F-score: $f, Beta = 1")
}
val beta = 0.5
val fScore = metrics.fMeasureByThreshold(beta)
f1Score.foreach { case (t, f) =>
println(s"Threshold: $t, F-score: $f, Beta = 0.5")
}
// AUPRC
val auPRC = metrics.areaUnderPR
println(s"Area under precision-recall curve = $auPRC")
// 计算在ROC和PR曲线中使用的阈值
val thresholds = precision.map(_._1)
// ROC曲线
val roc = metrics.roc
// AUROC
val auROC = metrics.areaUnderROC
println(s"Area under ROC = $auROC")
3
多分类
4
多分类实例
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_multiclass_classification_data.txt")
val Array(training, test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
training.cache()
val model = new LogisticRegressionWithLBFGS().setNumClasses(3).run(training)
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
val metrics = new MulticlassMetrics(predictionAndLabels)
// 混淆矩阵
println("Confusion matrix:")
println(metrics.confusionMatrix)
// 总体统计
val accuracy = metrics.accuracy
println("Summary Statistics")
println(s"Accuracy = $accuracy")
// 通过标签预测
val labels = metrics.labels
labels.foreach { l =>
println(s"Precision($l) = " + metrics.precision(l))
}
// 通过标签计算召回率
labels.foreach { l =>
println(s"Recall($l) = " + metrics.recall(l))
}
// 通过标签计算假阳性率
labels.foreach { l =>
println(s"FPR($l) = " + metrics.falsePositiveRate(l))
}
// 通过标签计算F1
labels.foreach { l =>
println(s"F1-Score($l) = " + metrics.fMeasure(l))
}
// 加权统计
println(s"Weighted precision: ${metrics.weightedPrecision}")
println(s"Weighted recall: ${metrics.weightedRecall}")
println(s"Weighted F1 score: ${metrics.weightedFMeasure}")
println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")
5
多标签分类
6
多标签分类实例
import org.apache.spark.mllib.evaluation.MultilabelMetrics
import org.apache.spark.rdd.RDD
val scoreAndLabels: RDD[(Array[Double], Array[Double])] = sc.parallelize(
Seq((Array(0.0, 1.0), Array(0.0, 2.0)),
(Array(0.0, 2.0), Array(0.0, 1.0)),
(Array.empty[Double], Array(0.0)),
(Array(2.0), Array(2.0)),
(Array(2.0, 0.0), Array(2.0, 0.0)),
(Array(0.0, 1.0, 2.0), Array(0.0, 1.0)),
(Array(1.0), Array(1.0, 2.0))), 2)
val metrics = new MultilabelMetrics(scoreAndLabels)
println(s"Recall = ${metrics.recall}")
println(s"Precision = ${metrics.precision}")
println(s"F1 measure = ${metrics.f1Measure}")
println(s"Accuracy = ${metrics.accuracy}")
metrics.labels.foreach(label =>
println(s"Class $label precision = ${metrics.precision(label)}"))
metrics.labels.foreach(label => println(s"Class $label recall = ${metrics.recall(label)}"))
metrics.labels.foreach(label => println(s"Class $label F1-score = ${metrics.f1Measure(label)}"))
println(s"Micro recall = ${metrics.microRecall}")
println(s"Micro precision = ${metrics.microPrecision}")
println(s"Micro F1 measure = ${metrics.microF1Measure}")
println(s"Hamming loss = ${metrics.hammingLoss}")
println(s"Subset accuracy = ${metrics.subsetAccuracy}")
7
排序算法
8
排序算法实例
import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
// 读取收视率数据
val ratings = spark.read.textFile("data/mllib/sample_movielens_data.txt").rdd.map { line =>
val fields = line.split("::")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
}.cache()
// 将等级映射为1或0,其中1表示应该推荐的电影
val binarizedRatings = ratings.map(r => Rating(r.user, r.product,
if (r.rating > 0) 1.0 else 0.0)).cache()
// 汇总评分
val numRatings = ratings.count()
val numUsers = ratings.map(_.user).distinct().count()
val numMovies = ratings.map(_.product).distinct().count()
println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")
// 建立模型
val numIterations = 10
val rank = 10
val lambda = 0.01
val model = ALS.train(ratings, rank, numIterations, lambda)
// 定义一个函数以将等级从0缩放到1
def scaledRating(r: Rating): Rating = {
val scaledRating = math.max(math.min(r.rating, 1.0), 0.0)
Rating(r.user, r.product, scaledRating)
}
// 获取每个用户的排名前十的预测,然后从[0,1]开始缩放
val userRecommended = model.recommendProductsForUsers(10).map { case (user, recs) =>
(user, recs.map(scaledRating))
}
// 假设用户评分3或更高(对应于1)的任何电影都是相关文档
// 与最相关的十大文件进行比较
val userMovies = binarizedRatings.groupBy(_.user)
val relevantDocuments = userMovies.join(userRecommended).map { case (user, (actual,
predictions)) =>
(predictions.map(_.product), actual.filter(_.rating > 0.0).map(_.product).toArray)
}
// 实例化指标对象
val metrics = new RankingMetrics(relevantDocuments)
//精度
Array(1, 3, 5).foreach { k =>
println(s"Precision at $k = ${metrics.precisionAt(k)}")
}
// 平均平均精度
println(s"Mean average precision = ${metrics.meanAveragePrecision}")
// 归一化折现累计收益
Array(1, 3, 5).foreach { k =>
println(s"NDCG at $k = ${metrics.ndcgAt(k)}")
}
// 获取每个数据点的预测
val allPredictions = model.predict(ratings.map(r => (r.user, r.product))).map(r => ((r.user,
r.product), r.rating))
val allRatings = ratings.map(r => ((r.user, r.product), r.rating))
val predictionsAndLabels = allPredictions.join(allRatings).map { case ((user, product),
(predicted, actual)) =>
(predicted, actual)
}
// 使用回归指标获取RMSE
val regressionMetrics = new RegressionMetrics(predictionsAndLabels)
println(s"RMSE = ${regressionMetrics.rootMeanSquaredError}")
// 计算R方
println(s"R-squared = ${regressionMetrics.r2}")
9
回归算法评估
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD}
// 加载数据
val data = spark
.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
.rdd.map(row => LabeledPoint(row.getDouble(0), row.get(1).asInstanceOf[Vector]))
.cache()
// 建立模型
val numIterations = 100
val model = LinearRegressionWithSGD.train(data, numIterations)
// 获取预测
val valuesAndPreds = data.map{ point =>
val prediction = model.predict(point.features)
(prediction, point.label)
}
// 实例化指标对象
val metrics = new RegressionMetrics(valuesAndPreds)
// Squared error
println(s"MSE = ${metrics.meanSquaredError}")
println(s"RMSE = ${metrics.rootMeanSquaredError}")
// R-squared
println(s"R-squared = ${metrics.r2}")
// Mean absolute error
println(s"MAE = ${metrics.meanAbsoluteError}")
// Explained variance
println(s"Explained variance = ${metrics.explainedVariance}")
Spark模型评估指标以及源码分析的全部内容至此结束,有关Spark的基础文章可参考前文:
参考链接:
http://spark.apache.org/docs/latest/mllib-evaluation-metrics.html
历史推荐
一个赞,晚餐加鸡腿
以上是关于Spark分布式机器学习源码分析:模型评估指标的主要内容,如果未能解决你的问题,请参考以下文章
学习笔记Spark—— Spark MLlib应用—— 机器学习简介Spark MLlib简介