spark2.1.1创建Pipeline
Posted 大葱拌豆腐
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark2.1.1创建Pipeline相关的知识,希望对你有一定的参考价值。
Pipeline 为流程,是Spark创建机器学习的一个流程控制的类
下面直接贴出创建的代码,以及整个流程
第一种:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.ml.linalg.Vector
/**
* Created by xiaopengpeng on 2016/12/20.
*/
class Popeline_ {
}
object Popeline_{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("popeline")
.master("local[*]")
.config("spark.sql.warehouse.dir", "warehouse/dir")
.getOrCreate()
//创建原始数据
val training = spark.createDataFrame(Seq((0L,"a b c d e spark",1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id","text","label")
//创建分词
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
//创建hashingTF
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")
//创建模型
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)
//创建流程
val pipeline = new Pipeline().setStages(Array(tokenizer,hashingTF,lr))
//进行模型训练
val model = pipeline.fit(training)
//把模型存储到磁盘上
model.write.overwrite().save("result/model/popeline")
//把没有训练的模型存储到磁盘上
pipeline.write.overwrite().save("result/unmodel/poeline")
//从磁盘上读取
val sameModel = PipelineModel.load("result/model/popeline")
//创建测试数据
val test = spark.createDataFrame(Seq((4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id","text")
//测试的输出
model.transform(test).select("id","text","probability","prediction").collect()
.foreach{case Row(id:Long,text:String,prob:Vector,prediction:Double) => println(s"($id,$text) --> prob=$prob, prediction = $prediction")}
spark.stop()
}
}
第二种:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.{Row, SparkSession}
/**
* Created by xiaopengpeng on 2016/12/20.
*/
class Popeline_2 {
}
object Popeline_2{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("流程")
.master("local[*]")
.config("spark.sql.warehouse.dir", "warehouse/dir")
.getOrCreate()
val training = spark.createDataFrame(Seq((1.0,Vectors.dense(0.0,1.1,0.1),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
))).toDF("label","features")
val lr = new LogisticRegression()
println("LogisticRegression parameters:\n"+lr.explainParams()+"\n")
lr.setMaxIter(10).setRegParam(0.01)
val model1 = lr.fit(training)
println("Model 1 was fit using parameters: "+model1.parent.extractParamMap())
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter -> 30)//这个会覆盖上一个
.put(lr.regParam -> 0.1 ,lr.threshold -> 0.55)
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") //改变输出列名
val paramMapCombined = paramMap++paramMap2
val model2 = lr.fit(training,paramMapCombined)
println("Model 2 was fit using parameters: "+model2.parent.extractParamMap)
val test = spark.createDataFrame(Seq((1.0,Vectors.dense(-1.0,1.5,1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label","features")
model2.transform(test)
.select("features","label","myProbability","prediction")
.collect()
.foreach{case Row(features:Vector,lable:Double,prob:Vector,prediction:Double) => println(s"($features,$lable) ->prob=$prob,prediction=$prediction")}
}
}
以上是关于spark2.1.1创建Pipeline的主要内容,如果未能解决你的问题,请参考以下文章
将 Sklearn GridSearchCV 与 Pipeline 一起使用时如何传递权重
03. 搭建Spark集群(CentOS7+Spark2.1.1+Hadoop2.8.0)
大数据学习环境搭建(CentOS6.9+Hadoop2.7.3+Hive1.2.1+Hbase1.3.1+Spark2.1.1)