Spark的逻辑回归与P_R_F评估

Posted 猫二哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark的逻辑回归与P_R_F评估相关的知识,希望对你有一定的参考价值。

Spark的逻辑回归与P_R_F评估

1逻辑回归

可以使用预测2分类的场景,必须使用已经有分类的样本,然后经过训练,预测未分类的样本的Lable,输出是概率,表示一般为正的概率是好多。

输入:
libsvn数据
样本如下:
sample_binary_classification_data.txt在spark的目录中有,属性太多了就不复制了。一般这种数据是存在表中,att1,att2…attn,Lable,使用sql语句就可以转换成libsvn格式了哈,比较简单。
输出:
训练之后,得到的是逻辑回归系数,对于任何一个未分类样本,通过属性和逻辑回归系数的乘积和就是预测为正的概率。
比如,下面就是,(预测未正的概率,样本真实的分类)
0.9999999999883085–>1.0
0.999999905633042–>1.0
5.131563021283302E-10–>0.0
具体的概念请自行查询

2逻辑回归的评估

评估方式使用p,r,F值这种不均匀样本的评估方式,下面是各个指标的含义
p:精度,准确性的,越大越好
r:召回,预测正确的覆盖的样本广度,越大越好
F值:调和值,越大约好
但是p和r一般是相对的,一个太大,一个铁定太小,看公式就可以明白。
ROC曲线的下的面积AUC,表示提升,增加误差,会提升多少的准确,如果增加10%的误差,会提升60%的准确,那么模型是可以的,AUC越大越好(接近1),具体含义请查询。以后会专门写一篇评估算法文章。

3spark代码实现(其实就是spark的exaple的列子哈,做了一些修改和注释)

package org.wq.scala.ml
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.SparkConf, SparkContext
// $example on$
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
/**
  * Created by Administrator on 2016/10/21.
  */
object LogicRegression 

  def main(args: Array[String]): Unit = 
    //有两个参数,第一个参数是libsvn的训练样本,第二个参数是保存路劲

    if(args.length!=2)
      println("请输入两个参数 第一个参数是libsvn的训练样本,第二个参数是保存路劲")
      System.exit(0)
    
    val data_path=args(0)
    val model_path=args(1)
    val conf = new SparkConf().setAppName("BinaryClassificationMetricsExample")
    val sc = new SparkContext(conf)
    // $example on$
    // Load training data in LIBSVM format
    //加载SVM文件
    //SVM文件格式为:Label 1:value 2:value
    //Lable只有1和0,使用逻辑回归必须这样哈
    //这种格式的数据一般使用sql就可可以构建
    //RDD[LabeledPoint]
    val data = MLUtils.loadLibSVMFile(sc, data_path)

    // Split data into training (60%) and test (40%)
    val Array(training, test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
    training.cache()

    // Run training algorithm to build the model
    //LBFGS是一种优化算法,作用于梯度下降法类似
    //setNumClasses表示类标签有2个
    val model = new LogisticRegressionWithLBFGS()
      .setNumClasses(2)
      .run(training)

    // Clear the prediction threshold so the model will return probabilities
    //清楚threshold,那么模型返回值为概率-没怎么看懂哈NO!
    model.clearThreshold

    // Compute raw scores on the test set
    //结果为(预测分类概率,真实分类) 一般是预测为分类为正向1的概率
    val predictionAndLabels = test.map  case LabeledPoint(label, features) =>
      val prediction = model.predict(features)
      (prediction, label)
    
    predictionAndLabels.collect().map(x=>
      println(x._1+"-->"+x._2)

    )
    //模型的存储和读取
    model.save(sc,model_path)
    //LogisticRegression.load("");

    // Instantiate metrics object
    //使用了一个BinaryClassificationMetrics来评估
    val metrics = new BinaryClassificationMetrics(predictionAndLabels)

    // Precision by threshold
    //是什么意思呢,是逻辑回归概率的阈值,大于它为正(1),小于它为负(0)
    //这里列出了所有阈值的p,r,f值
    val precision = metrics.precisionByThreshold
    precision.foreach  case (t, p) =>
      println(s"Threshold: $t, Precision: $p")
    

    // Recall by threshold
    val recall = metrics.recallByThreshold
    recall.foreach  case (t, r) =>
      println(s"Threshold: $t, Recall: $r")
    

    // Precision-Recall Curve
    val PRC = metrics.pr

    // F-measure

    //the beta factor in F-Measure computation.
    //beta 表示概率的阈值哈
    val f1Score = metrics.fMeasureByThreshold
    f1Score.foreach  case (t, f) =>
      println(s"Threshold: $t, F-score: $f, Beta = 1")
    

    val beta = 0.5
    val fScore = metrics.fMeasureByThreshold(beta)
    f1Score.foreach  case (t, f) =>
      println(s"Threshold: $t, F-score: $f, Beta = 0.5")
    

    // AUPRC,精度,召回曲线下的面积
    val auPRC = metrics.areaUnderPR
    println("Area under precision-recall curve = " + auPRC)

    // Compute thresholds used in ROC and PR curves
    val thresholds = precision.map(_._1)

    // ROC Curve
    val roc = metrics.roc

    // AUROC,ROC曲线下面的面积,人称AUC
    val auROC = metrics.areaUnderROC
    println("Area under ROC = " + auROC)
    // $example off$
  

这个是部署到spark的版本,本地运行的版本请修改,就是修改master为local和证据一个warehouse:

    val conf = new SparkConf().setAppName("BinaryClassificationMetricsExample").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse")

val data_path="data/mllib/sample_binary_classification_data.txt"
val model_path="data/mllib/LRModel"

4提交部署,预测

我的目录
数据目录:/home/jar/data
jar目录:/home/jar
模型目录:/home/jar/model
把jar上传到master节点的/home/jar目录下(可以上传到任意节点),且保证salves节点的/home/jar/data目录都有sample_binary_classification_data.txt 文件。
接着输入spark-submit把任务提交给集群执行

spark-submit  --class org.wq.scala.ml.LogicRegression --master spark://master:7077 --executor-memory 700m --num-executors 1  /home/jar/LRModel.jar  /home/jar/data/sample_binary_classification_data.txt  /home/jar/model/LRModel

执行结果,额太长了,复制不下来,粘贴最后一点吧。在选定那个阈值作为模型的概率的阈值的时候,建议查看auc和F值,两个值比较大的那个概率就可以比较好的哈。

Area under precision-recall curve = 1.0
16/10/21 11:21:38 INFO SparkContext: Starting job: collect at SlidingRDD.scala:81
16/10/21 11:21:38 INFO DAGScheduler: Got job 42 (collect at SlidingRDD.scala:81) with 4 output partitions
16/10/21 11:21:38 INFO DAGScheduler: Final stage: ResultStage 60 (collect at SlidingRDD.scala:81)
16/10/21 11:21:38 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 59)
16/10/21 11:21:38 INFO DAGScheduler: Missing parents: List()
16/10/21 11:21:38 INFO DAGScheduler: Submitting ResultStage 60 (MapPartitionsRDD[89] at mapPartitions at SlidingRDD.scala:78), which has no missing parents
16/10/21 11:21:38 INFO MemoryStore: Block broadcast_46 stored as values in memory (estimated size 6.0 KB, free 413.7 MB)
16/10/21 11:21:38 INFO MemoryStore: Block broadcast_46_piece0 stored as bytes in memory (estimated size 3.1 KB, free 413.7 MB)
16/10/21 11:21:38 INFO BlockManagerInfo: Added broadcast_46_piece0 in memory on 192.168.247.132:43830 (size: 3.1 KB, free: 413.9 MB)
16/10/21 11:21:38 INFO SparkContext: Created broadcast 46 from broadcast at DAGScheduler.scala:1012
16/10/21 11:21:38 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 60 (MapPartitionsRDD[89] at mapPartitions at SlidingRDD.scala:78)
16/10/21 11:21:38 INFO TaskSchedulerImpl: Adding task set 60.0 with 4 tasks
16/10/21 11:21:38 INFO TaskSetManager: Starting task 1.0 in stage 60.0 (TID 89, 192.168.247.133, partition 1, PROCESS_LOCAL, 5309 bytes)
16/10/21 11:21:38 INFO TaskSetManager: Starting task 2.0 in stage 60.0 (TID 90, 192.168.247.134, partition 2, PROCESS_LOCAL, 5309 bytes)
16/10/21 11:21:38 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 89 on executor id: 1 hostname: 192.168.247.133.
16/10/21 11:21:38 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 90 on executor id: 0 hostname: 192.168.247.134.
16/10/21 11:21:38 INFO BlockManagerInfo: Added broadcast_46_piece0 in memory on 192.168.247.133:33302 (size: 3.1 KB, free: 225.9 MB)
16/10/21 11:21:38 INFO BlockManagerInfo: Added broadcast_46_piece0 in memory on 192.168.247.134:48184 (size: 3.1 KB, free: 225.9 MB)
16/10/21 11:21:38 INFO TaskSetManager: Starting task 0.0 in stage 60.0 (TID 91, 192.168.247.133, partition 0, PROCESS_LOCAL, 5620 bytes)
16/10/21 11:21:38 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 91 on executor id: 1 hostname: 192.168.247.133.
16/10/21 11:21:38 INFO TaskSetManager: Finished task 1.0 in stage 60.0 (TID 89) in 50 ms on 192.168.247.133 (1/4)
16/10/21 11:21:38 INFO TaskSetManager: Starting task 3.0 in stage 60.0 (TID 92, 192.168.247.134, partition 3, PROCESS_LOCAL, 5620 bytes)
16/10/21 11:21:38 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 92 on executor id: 0 hostname: 192.168.247.134.
16/10/21 11:21:38 INFO TaskSetManager: Finished task 2.0 in stage 60.0 (TID 90) in 64 ms on 192.168.247.134 (2/4)
16/10/21 11:21:38 INFO TaskSetManager: Finished task 0.0 in stage 60.0 (TID 91) in 25 ms on 192.168.247.133 (3/4)
16/10/21 11:21:38 INFO TaskSetManager: Finished task 3.0 in stage 60.0 (TID 92) in 27 ms on 192.168.247.134 (4/4)
16/10/21 11:21:38 INFO TaskSchedulerImpl: Removed TaskSet 60.0, whose tasks have all completed, from pool 
16/10/21 11:21:38 INFO DAGScheduler: ResultStage 60 (collect at SlidingRDD.scala:81) finished in 0.084 s
16/10/21 11:21:38 INFO DAGScheduler: Job 42 finished: collect at SlidingRDD.scala:81, took 0.104986 s
16/10/21 11:21:38 INFO SparkContext: Starting job: aggregate at AreaUnderCurve.scala:45
16/10/21 11:21:38 INFO DAGScheduler: Got job 43 (aggregate at AreaUnderCurve.scala:45) with 3 output partitions
16/10/21 11:21:38 INFO DAGScheduler: Final stage: ResultStage 63 (aggregate at AreaUnderCurve.scala:45)
16/10/21 11:21:38 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 62)
16/10/21 11:21:38 INFO DAGScheduler: Missing parents: List()
16/10/21 11:21:38 INFO DAGScheduler: Submitting ResultStage 63 (SlidingRDD[88] at RDD at SlidingRDD.scala:50), which has no missing parents
16/10/21 11:21:38 INFO MemoryStore: Block broadcast_47 stored as values in memory (estimated size 6.1 KB, free 413.7 MB)
16/10/21 11:21:38 INFO MemoryStore: Block broadcast_47_piece0 stored as bytes in memory (estimated size 3.2 KB, free 413.7 MB)
16/10/21 11:21:38 INFO BlockManagerInfo: Added broadcast_47_piece0 in memory on 192.168.247.132:43830 (size: 3.2 KB, free: 413.9 MB)
16/10/21 11:21:38 INFO SparkContext: Created broadcast 47 from broadcast at DAGScheduler.scala:1012
16/10/21 11:21:38 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 63 (SlidingRDD[88] at RDD at SlidingRDD.scala:50)
16/10/21 11:21:38 INFO TaskSchedulerImpl: Adding task set 63.0 with 3 tasks
16/10/21 11:21:38 INFO TaskSetManager: Starting task 1.0 in stage 63.0 (TID 93, 192.168.247.133, partition 1, PROCESS_LOCAL, 5848 bytes)
16/10/21 11:21:38 INFO TaskSetManager: Starting task 2.0 in stage 63.0 (TID 94, 192.168.247.134, partition 2, PROCESS_LOCAL, 5848 bytes)
16/10/21 11:21:38 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 93 on executor id: 1 hostname: 192.168.247.133.
16/10/21 11:21:38 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 94 on executor id: 0 hostname: 192.168.247.134.
16/10/21 11:21:38 INFO BlockManagerInfo: Added broadcast_47_piece0 in memory on 192.168.247.133:33302 (size: 3.2 KB, free: 225.9 MB)
16/10/21 11:21:38 INFO BlockManagerInfo: Added broadcast_47_piece0 in memory on 192.168.247.134:48184 (size: 3.2 KB, free: 225.9 MB)
16/10/21 11:21:38 INFO TaskSetManager: Starting task 0.0 in stage 63.0 (TID 95, 192.168.247.133, partition 0, PROCESS_LOCAL, 6037 bytes)
16/10/21 11:21:38 INFO TaskSetManager: Finished task 1.0 in stage 63.0 (TID 93) in 53 ms on 192.168.247.133 (1/3)
16/10/21 11:21:38 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 95 on executor id: 1 hostname: 192.168.247.133.
16/10/21 11:21:38 INFO TaskSetManager: Finished task 2.0 in stage 63.0 (TID 94) in 63 ms on 192.168.247.134 (2/3)
16/10/21 11:21:38 INFO TaskSetManager: Finished task 0.0 in stage 63.0 (TID 95) in 21 ms on 192.168.247.133 (3/3)
16/10/21 11:21:38 INFO TaskSchedulerImpl: Removed TaskSet 63.0, whose tasks have all completed, from pool 
16/10/21 11:21:38 INFO DAGScheduler: ResultStage 63 (aggregate at AreaUnderCurve.scala:45) finished in 0.069 s
16/10/21 11:21:38 INFO DAGScheduler: Job 43 finished: aggregate at AreaUnderCurve.scala:45, took 0.108477 s
Area under ROC = 1.0
16/10/21 11:21:38 INFO SparkContext: Invoking stop() from shutdown hook
16/10/21 11:21:39 INFO SparkUI: Stopped Spark web UI at http://192.168.247.132:4041
16/10/21 11:21:39 INFO StandaloneSchedulerBackend: Shutting down all executors
16/10/21 11:21:39 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
16/10/21 11:21:39 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/10/21 11:21:39 INFO MemoryStore: MemoryStore cleared
16/10/21 11:21:39 INFO BlockManager: BlockManager stopped
16/10/21 11:21:39 INFO BlockManagerMaster: BlockManagerMaster stopped
16/10/21 11:21:39 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/10/21 11:21:39 INFO SparkContext: Successfully stopped SparkContext
16/10/21 11:21:39 INFO ShutdownHookManager: Shutdown hook called
16/10/21 11:21:39 INFO ShutdownHookManager: Deleting directory /tmp/spark-c96536c2-5fc6-4dab-b72e-30b588eaedbe

以上是关于Spark的逻辑回归与P_R_F评估的主要内容,如果未能解决你的问题,请参考以下文章

Spark 多项式 Logistic 回归中的意外系数

如何将从逻辑回归模型获得的系数映射到pyspark中的特征名称

逻辑回归和系数

Spark MLlib 机器学习

Spark MLlib 机器学习

逻辑回归系数没有意义