Spark机器学习基础三
Posted chenxiangzhen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark机器学习基础三相关的知识,希望对你有一定的参考价值。
监督学习
0.线性回归(加L1、L2正则化)
from __future__ import print_function
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("LinearRegressionWithElasticNet") .getOrCreate()
# 加载数据
training = spark.read.format("libsvm").load("sample_linear_regression_data.txt")
# 线性回归
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 拟合模型
lrModel = lr.fit(training)
# 输出系数和截距
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
# 模型信息总结输出
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
spark.stop()
Coefficients: [0.0,0.32292516677405936,-0.3438548034562218,1.9156017023458414,0.05288058680386263,0.765962720459771,0.0,-0.15105392669186682,-0.21587930360904642,0.22025369188813426]
Intercept: 0.1598936844239736
numIterations: 7
objectiveHistory: [0.49999999999999994, 0.4967620357443381, 0.4936361664340463, 0.4936351537897608, 0.4936351214177871, 0.49363512062528014, 0.4936351206216114]
+--------------------+
| residuals|
+--------------------+
| -9.889232683103197|
| 0.5533794340053554|
| -5.204019455758823|
| -20.566686715507508|
| -9.4497405180564|
| -6.909112502719486|
| -10.00431602969873|
| 2.062397807050484|
| 3.1117508432954772|
| -15.893608229419382|
| -5.036284254673026|
| 6.483215876994333|
| 12.429497299109002|
| -20.32003219007654|
| -2.0049838218725005|
| -17.867901734183793|
| 7.646455887420495|
| -2.2653482182417406|
|-0.10308920436195645|
| -1.380034070385301|
+--------------------+
only showing top 20 rows
RMSE: 10.189077
r2: 0.022861
1.广义线性模型
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.regression import GeneralizedLinearRegression
spark = SparkSession .builder .appName("GeneralizedLinearRegressionExample") .getOrCreate()
# 加载数据
dataset = spark.read.format("libsvm").load("sample_linear_regression_data.txt")
# 广义线性模型
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)
# 拟合模型
model = glr.fit(dataset)
# 输出系数和截距
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
# 模型信息总结与输出
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()
spark.stop()
Coefficients: [0.010541828081257216,0.8003253100560949,-0.7845165541420371,2.3679887171421914,0.5010002089857577,1.1222351159753026,-0.2926824398623296,-0.49837174323213035,-0.6035797180675657,0.6725550067187461]
Intercept: 0.14592176145232041
Coefficient Standard Errors: [0.7950428434287478, 0.8049713176546897, 0.7975916824772489, 0.8312649247659919, 0.7945436200517938, 0.8118992572197593, 0.7919506385542777, 0.7973378214726764, 0.8300714999626418, 0.7771333489686802, 0.463930109648428]
T Values: [0.013259446542269243, 0.9942283563442594, -0.9836067393599172, 2.848657084633759, 0.6305509179635714, 1.382234441029355, -0.3695715687490668, -0.6250446546128238, -0.7271418403049983, 0.8654306337661122, 0.31453393176593286]
P Values: [0.989426199114056, 0.32060241580811044, 0.3257943227369877, 0.004575078538306521, 0.5286281628105467, 0.16752945248679119, 0.7118614002322872, 0.5322327097421431, 0.467486325282384, 0.3872259825794293, 0.753249430501097]
Dispersion: 105.60988356821714
Null Deviance: 53229.3654338832
Residual Degree Of Freedom Null: 500
Deviance: 51748.8429484264
Residual Degree Of Freedom: 490
AIC: 3769.1895871765314
Deviance Residuals:
+-------------------+
| devianceResiduals|
+-------------------+
|-10.974359174246889|
| 0.8872320138420559|
| -4.596541837478908|
|-20.411667435019638|
|-10.270419345342642|
|-6.0156058956799905|
|-10.663939415849267|
| 2.1153960525024713|
| 3.9807132379137675|
|-17.225218272069533|
| -4.611647633532147|
| 6.4176669407698546|
| 11.407137945300537|
| -20.70176540467664|
| -2.683748540510967|
|-16.755494794232536|
| 8.154668342638725|
|-1.4355057987358848|
|-0.6435058688185704|
| -1.13802589316832|
+-------------------+
only showing top 20 rows
2.逻辑回归
from __future__ import print_function
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("LogisticRegressionSummary") .getOrCreate()
# 加载数据
training = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# 逻辑回归
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 拟合模型
lrModel = lr.fit(training)
# 模型信息总结与输出
trainingSummary = lrModel.summary
# 输出每一轮的损失函数值
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
print(objective)
# ROC曲线
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))
# Set the model threshold to maximize F-Measure
#fMeasure = trainingSummary.fMeasureByThreshold
#maxFMeasure = fMeasure.groupBy(['threshold']).max('F-Measure').select('max(F-Measure)')
#bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure.select('max(F-Measure)')['max(F-Measure)']).select('threshold')['threshold']
#lr.setThreshold(bestThreshold)
spark.stop()
objectiveHistory:
0.6833149135741672
0.6662875751473734
0.6217068546034618
0.6127265245887887
0.6060347986802873
0.6031750687571562
0.5969621534836274
0.5940743031983118
0.5906089243339022
0.5894724576491042
0.5882187775729587
+---+--------------------+
|FPR| TPR|
+---+--------------------+
|0.0| 0.0|
|0.0|0.017543859649122806|
|0.0| 0.03508771929824561|
|0.0| 0.05263157894736842|
|0.0| 0.07017543859649122|
|0.0| 0.08771929824561403|
|0.0| 0.10526315789473684|
|0.0| 0.12280701754385964|
|0.0| 0.14035087719298245|
|0.0| 0.15789473684210525|
|0.0| 0.17543859649122806|
|0.0| 0.19298245614035087|
|0.0| 0.21052631578947367|
|0.0| 0.22807017543859648|
|0.0| 0.24561403508771928|
|0.0| 0.2631578947368421|
|0.0| 0.2807017543859649|
|0.0| 0.2982456140350877|
|0.0| 0.3157894736842105|
|0.0| 0.3333333333333333|
+---+--------------------+
only showing top 20 rows
areaUnderROC: 1.0
from __future__ import print_function
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("LogisticRegressionWithElasticNet") .getOrCreate()
# 加载数据
training = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# 逻辑回归
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 拟合模型
lrModel = lr.fit(training)
# 系数与截距
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
# 多项式逻辑回归
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
# 拟合模型
mlrModel = mlr.fit(training)
# 输出系数
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))
spark.stop()
Coefficients: (692,[244,263,272,300,301,328,350,351,378,379,405,406,407,428,433,434,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.353983524188197e-05,-9.102738505589466e-05,-0.00019467430546904298,-0.00020300642473486668,-3.1476183314863995e-05,-6.842977602660743e-05,1.5883626898239883e-05,1.4023497091372047e-05,0.00035432047524968605,0.00011443272898171087,0.00010016712383666666,0.0006014109303795481,0.0002840248179122762,-0.00011541084736508837,0.000385996886312906,0.000635019557424107,-0.00011506412384575676,-0.00015271865864986808,0.0002804933808994214,0.0006070117471191634,-0.0002008459663247437,-0.0001421075579290126,0.0002739010341160883,0.00027730456244968115,-9.838027027269332e-05,-0.0003808522443517704,-0.00025315198008555033,0.00027747714770754307,-0.0002443619763919199,-0.0015394744687597765,-0.00023073328411331293])
Intercept: 0.22456315961250325
Multinomial coefficients: 2 X 692 CSRMatrix
(0,244) 0.0
(0,263) 0.0001
(0,272) 0.0001
(0,300) 0.0001
(0,350) -0.0
(0,351) -0.0
(0,378) -0.0
(0,379) -0.0
(0,405) -0.0
(0,406) -0.0006
(0,407) -0.0001
(0,428) 0.0001
(0,433) -0.0
(0,434) -0.0007
(0,455) 0.0001
(0,456) 0.0001
..
..
Multinomial intercepts: [-0.12065879445860686,0.12065879445860686]
3.多分类逻辑回归
from __future__ import print_function
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("MulticlassLogisticRegressionWithElasticNet") .getOrCreate()
# 加载数据
training = spark .read .format("libsvm") .load("sample_multiclass_classification_data.txt")
# 多分类逻辑回归
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 拟合模型
lrModel = lr.fit(training)
# 输出系数
print("Coefficients:
" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))
# 预测结果
lrModel.transform(training).show()
spark.stop()
Coefficients:
3 X 4 CSRMatrix
(0,3) 0.3176
(1,2) -0.7804
(1,3) -0.377
Intercept: [0.05165231659832854,-0.12391224990853622,0.07225993331020768]
+-----+--------------------+--------------------+--------------------+----------+
|label| features| rawPrediction| probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
| 1.0|(4,[0,1,2,3],[-0....|[-0.2130545101220...|[0.19824091021950...| 1.0|
| 1.0|(4,[0,1,2,3],[-0....|[-0.2395254151479...|[0.18250386256254...| 1.0|
| 1.0|(4,[0,1,2,3],[-0....|[-0.2130545101220...|[0.18980556250236...| 1.0|
| 1.0|(4,[0,1,2,3],[-0....|[-0.2395254151479...|[0.19632523546632...| 1.0|
| 0.0|(4,[0,1,2,3],[0.1...|[0.21047647616023...|[0.43750398183438...| 0.0|
| 1.0|(4,[0,2,3],[-0.83...|[-0.2395254151479...|[0.18250386256254...| 1.0|
| 2.0|(4,[0,1,2,3],[-1....|[0.07812299927036...|[0.37581775428218...| 0.0|
| 2.0|(4,[0,1,2,3],[-1....|[0.05165230377890...|[0.35102739153795...| 2.0|
| 1.0|(4,[0,1,2,3],[-0....|[-0.2659960025254...|[0.17808226409449...| 1.0|
| 0.0|(4,[0,2,3],[0.611...|[0.18400588878268...|[0.44258017540583...| 0.0|
| 0.0|(4,[0,1,2,3],[0.2...|[0.23694706353777...|[0.44442301486604...| 0.0|
| 1.0|(4,[0,1,2,3],[-0....|[-0.2659960025254...|[0.17539206930356...| 1.0|
| 1.0|(4,[0,1,2,3],[-0....|[-0.2395254151479...|[0.18250386256254...| 1.0|
| 2.0|(4,[0,1,2,3],[-0....|[0.05165230377890...|[0.35371124645092...| 2.0|
| 2.0|(4,[0,1,2,3],[-0....|[-0.0277597631826...|[0.32360705108265...| 2.0|
| 2.0|(4,[0,1,2,3],[-0....|[0.02518163392628...|[0.33909561029444...| 2.0|
| 1.0|(4,[0,2,3],[-0.94...|[-0.2395254151479...|[0.17976563656243...| 1.0|
| 2.0|(4,[0,1,2,3],[-0....|[-0.0012891758050...|[0.32994371314262...| 2.0|
| 0.0|(4,[0,1,2,3],[0.1...|[0.10459380900173...|[0.39691355784123...| 0.0|
| 2.0|(4,[0,1,2,3],[-0....|[0.02518163392628...|[0.34718685710751...| 2.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
4.多层感知器(MLP)
from __future__ import print_function
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
spark = SparkSession .builder.appName("multilayer_perceptron_classification_example").getOrCreate()
# 加载数据
data = spark.read.format("libsvm").load("sample_multiclass_classification_data.txt")
# 切分训练集和测试集
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# 输入、隐层、隐层、输出个数
layers = [4, 5, 4, 3]
# 创建多层感知器
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# 训练模型
model = trainer.fit(train)
# 预测和计算准确度
result = model.transform(test)
result.show()
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
spark.stop()
+-----+--------------------+--------------------+--------------------+----------+
|label| features| rawPrediction| probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
| 0.0|(4,[0,1,2,3],[-0....|[-29.588369001638...|[2.63020383878084...| 2.0|
| 0.0|(4,[0,1,2,3],[-0....|[125.657894478296...|[1.0,1.4484875476...| 0.0|
| 0.0|(4,[0,1,2,3],[-0....|[126.190155254739...|[1.0,5.1578089761...| 0.0|
| 0.0|(4,[0,1,2,3],[-0....|[-26.984478255346...|[4.23003198458660...| 2.0|
| 0.0|(4,[0,1,2,3],[-0....|[-29.588369001638...|[2.63020383878084...| 2.0|
| 0.0|(4,[0,1,2,3],[-1....|[-29.588368732563...|[2.63020459374897...| 2.0|
| 0.0|(4,[0,1,2,3],[0.1...|[126.190175711705...|[1.0,5.1572549882...| 0.0|
| 0.0|(4,[0,1,2,3],[0.2...|[126.190175994586...|[1.0,5.1572473280...| 0.0|
| 0.0|(4,[0,1,2,3],[0.3...|[126.190175994586...|[1.0,5.1572473280...| 0.0|
| 0.0|(4,[0,1,2,3],[0.3...|[126.190175994586...|[1.0,5.1572473280...| 0.0|
| 0.0|(4,[0,1,2,3],[0.3...|[126.190175994586...|[1.0,5.1572473280...| 0.0|
| 0.0|(4,[0,1,2,3],[0.4...|[126.190175994586...|[1.0,5.1572473280...| 0.0|
| 0.0|(4,[0,1,2,3],[0.5...|[126.190175994586...|[1.0,5.1572473280...| 0.0|
| 0.0|(4,[0,1,2,3],[0.7...|[126.190175994586...|[1.0,5.1572473280...| 0.0|
| 0.0|(4,[0,1,2,3],[0.8...|[126.190175994586...|[1.0,5.1572473280...| 0.0|
| 0.0|(4,[0,1,2,3],[1.0...|[126.190175994592...|[1.0,5.1572473278...| 0.0|
| 0.0|(4,[0,2,3],[0.166...|[126.190175994583...|[1.0,5.1572473280...| 0.0|
| 0.0|(4,[0,2,3],[0.388...|[126.190175994586...|[1.0,5.1572473280...| 0.0|
| 1.0|(4,[0,1,2,3],[-0....|[-122.71364090590...|[1.47439846164393...| 1.0|
| 1.0|(4,[0,1,2,3],[-0....|[-122.71364090590...|[1.47439846164393...| 1.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
Test set accuracy = 0.9019607843137255
5.决策树分类
from __future__ import print_function
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("DecisionTreeClassificationExample") .getOrCreate()
# 加载数据
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# 将分类值转换为类别索引
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# 用于为向量数据集中的分类特征列建立索引
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# 将数据划分为培训和测试集
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练决策树模型
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
treeModel = model.stages[2]
# summary only
print(treeModel)
spark.stop()
+----------+------------+--------------------+
|prediction|indexedLabel| features|
+----------+------------+--------------------+
| 1.0| 1.0|(692,[95,96,97,12...|
| 1.0| 1.0|(692,[98,99,100,1...|
| 1.0| 1.0|(692,[123,124,125...|
| 1.0| 1.0|(692,[124,125,126...|
| 1.0| 1.0|(692,[124,125,126...|
+----------+------------+--------------------+
only showing top 5 rows
Test Error = 0.0833333
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_e2935d804502) of depth 1 with 3 nodes
6.决策树回归
from __future__ import print_function
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("DecisionTreeRegressionExample") .getOrCreate()
# 加载数据
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
treeModel = model.stages[1]
# summary only
print(treeModel)
spark.stop()
+----------+-----+--------------------+
|prediction|label| features|
+----------+-----+--------------------+
| 0.0| 0.0|(692,[95,96,97,12...|
| 0.0| 0.0|(692,[98,99,100,1...|
| 0.0| 0.0|(692,[122,123,124...|
| 0.0| 0.0|(692,[123,124,125...|
| 0.0| 0.0|(692,[124,125,126...|
+----------+-----+--------------------+
only showing top 5 rows
Root Mean Squared Error (RMSE) on test data = 0.179605
DecisionTreeRegressionModel (uid=DecisionTreeRegressor_3c2c5981fa9b) of depth 1 with 3 nodes
7.随机森林分类
from __future__ import print_function
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("RandomForestClassifierExample") .getOrCreate()
# 加载数据
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
rfModel = model.stages[2]
print(rfModel) # summary only
spark.stop()
+--------------+-----+--------------------+
|predictedLabel|label| features|
+--------------+-----+--------------------+
| 0.0| 0.0|(692,[122,123,124...|
| 0.0| 0.0|(692,[123,124,125...|
| 0.0| 0.0|(692,[124,125,126...|
| 0.0| 0.0|(692,[126,127,128...|
| 0.0| 0.0|(692,[126,127,128...|
+--------------+-----+--------------------+
only showing top 5 rows
Test Error = 0.0344828
RandomForestClassificationModel (uid=RandomForestClassifier_5669b6efb44b) with 10 trees
8.随机森林回归
from __future__ import print_function
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("RandomForestRegressorExample") .getOrCreate()
# 加载数据
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")
# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
rfModel = model.stages[1]
print(rfModel) # summary only
spark.stop()
+----------+-----+--------------------+
|prediction|label| features|
+----------+-----+--------------------+
| 0.05| 0.0|(692,[95,96,97,12...|
| 0.0| 0.0|(692,[98,99,100,1...|
| 0.05| 0.0|(692,[123,124,125...|
| 0.0| 0.0|(692,[124,125,126...|
| 0.05| 0.0|(692,[124,125,126...|
+----------+-----+--------------------+
only showing top 5 rows
Root Mean Squared Error (RMSE) on test data = 0.111163
RandomForestRegressionModel (uid=RandomForestRegressor_d557129aa0ea) with 20 trees
9.梯度增强树分类
from __future__ import print_function
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("GradientBoostedTreeClassifierExample") .getOrCreate()
# 加载数据
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)
# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
gbtModel = model.stages[2]
print(gbtModel) # summary only
spark.stop()
+----------+------------+--------------------+
|prediction|indexedLabel| features|
+----------+------------+--------------------+
| 1.0| 1.0|(692,[95,96,97,12...|
| 1.0| 1.0|(692,[100,101,102...|
| 1.0| 1.0|(692,[122,123,124...|
| 1.0| 1.0|(692,[123,124,125...|
| 1.0| 1.0|(692,[124,125,126...|
+----------+------------+--------------------+
only showing top 5 rows
Test Error = 0.125
GBTClassificationModel (uid=GBTClassifier_4a28928cccd2) with 10 trees
10.梯度增强树回归
from __future__ import print_function
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("GradientBoostedTreeRegressorExample") .getOrCreate()
# 加载数据
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)
# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
gbtModel = model.stages[1]
print(gbtModel) # summary only
spark.stop()
+----------+-----+--------------------+
|prediction|label| features|
+----------+-----+--------------------+
| 1.0| 0.0|(692,[98,99,100,1...|
| 0.0| 0.0|(692,[122,123,124...|
| 0.0| 0.0|(692,[123,124,125...|
| 0.0| 0.0|(692,[124,125,126...|
| 0.0| 0.0|(692,[124,125,126...|
+----------+-----+--------------------+
only showing top 5 rows
Root Mean Squared Error (RMSE) on test data = 0.2
GBTRegressionModel (uid=GBTRegressor_a891b3f3210d) with 10 trees
11.机器学习模板与交叉验证
from __future__ import print_function
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
spark = SparkSession .builder .appName("CrossValidatorExample") .getOrCreate()
# $example on$
# Prepare training documents, which are labeled.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() .addGrid(hashingTF.numFeatures, [10, 100, 1000]) .addGrid(lr.regParam, [0.1, 0.01]) .build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "mapreduce spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
print(row)
spark.stop()
Row(id=4, text='spark i j k', probability=DenseVector([0.2581, 0.7419]), prediction=1.0)
Row(id=5, text='l m n', probability=DenseVector([0.9186, 0.0814]), prediction=0.0)
Row(id=6, text='mapreduce spark', probability=DenseVector([0.432, 0.568]), prediction=1.0)
Row(id=7, text='apache hadoop', probability=DenseVector([0.6766, 0.3234]), prediction=0.0)
以上是关于Spark机器学习基础三的主要内容,如果未能解决你的问题,请参考以下文章