Java应用XIV使用Apache Spark MLlib构建机器学习模型下

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java应用XIV使用Apache Spark MLlib构建机器学习模型下相关的知识,希望对你有一定的参考价值。

如果您觉得本博客的内容对您有所帮助或启发,请关注我的博客,以便第一时间获取最新技术文章和教程。同时,也欢迎您在评论区留言,分享想法和建议。谢谢支持!

四、无监督学习

4.1 聚类

4.1.1 K-Means

K-Means是一种常见的无监督学习算法,用于将一组数据分成k个簇,使得每个数据点都属于离其最近的簇。K-Means的目标是最小化所有数据点到其所属簇中心的距离的平方和。

K-Means算法的基本流程如下:

  1. 随机选择k个点作为初始簇中心。
  2. 将每个数据点分配到距离其最近的簇中心。
  3. 根据分配的结果,更新每个簇的中心。
  4. 重复步骤2和3,直到簇中心不再变化或达到最大迭代次数。

接下来,我们将编写Java代码来演示如何使用Spark MLlib进行K-Means聚类。

首先,我们需要准备数据。我们将使用Iris数据集,该数据集包含3种不同类型的鸢尾花(Iris setosa,Iris virginica和Iris versicolor),每种类型50个样本。我们将从UCI Machine Learning Repository下载数据集。请确保在运行代码之前将数据集下载到本地并提供正确的路径。

接下来,我们将使用Spark读取数据并将其转换为DataFrame。然后,我们将提取特征并将数据分成训练集和测试集。

最后,我们将使用Spark MLlib的KMeans算法对数据进行聚类,并计算簇内误差平方和(SSE)和轮廓系数,以评估聚类的效果。

下面是Java代码:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class KMeansExample
public static void main(String[] args)
// 创建SparkConf对象
SparkConf conf = new SparkConf()
.setAppName("KMeansExample")
.setMaster("local[*]");

// 创建JavaSparkContext对象
JavaSparkContext sc = new JavaSparkContext(conf);

// 创建SparkSession对象
SparkSession spark = SparkSession.builder()
.config(conf)
.getOrCreate();

// 读取数据并转换为DataFrame
String path = "path/to/iris.data";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Row> rows = data.map(line ->
String[] parts = line.split(",");
double sepalLength = Double.parseDouble(parts[0]);
double sepalWidth = Double.parseDouble(parts[1]);
double petalLength = Double.parseDouble(parts[2]);
double petalWidth = Double.parseDouble(parts[3]);
String label = parts[4];
return RowFactory.create(sepalLength, sepalWidth, petalLength, petalWidth, label);
);
StructType schema = new StructType(new StructField[]
new StructField("sepal_length", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("sepal_width", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("petal_length", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("petal_width", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("label", DataTypes.StringType, false, Metadata.empty())
);
Dataset<Row> df = spark.createDataFrame(rows, schema);

// 提取特征向量
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[] "sepal_length", "sepal_width", "petal_length", "petal_width")
.setOutputCol("features");
Dataset<Row> dataWithFeatures = assembler.transform(df);

// 将数据分为训练集和测试集
double trainTestRatio = 0.7;
Dataset<Row>[] dataSplits = dataWithFeatures.randomSplit(new double[] trainTestRatio, 1.0 - trainTestRatio);
Dataset<Row> trainingData = dataSplits[0];
Dataset<Row> testData = dataSplits[1];

// 构建KMeans模型
int numClusters = 3;
int numIterations = 20;
KMeans kmeans = new KMeans()
.setK(numClusters)
.setMaxIter(numIterations);
KMeansModel model = kmeans.fit(trainingData);

// 预测测试集并计算SSE和轮廓系数
Dataset<Row> predictions = model.transform(testData);
ClusteringEvaluator evaluator = new ClusteringEvaluator();
double sse = model.computeCost(testData);
double silhouette = evaluator.evaluate(predictions);

// 打印SSE和轮廓系数
System.out.println("SSE: " + sse);
System.out.println("Silhouette: " + silhouette);

// 停止Spark
spark.stop();

4.1.2 二分K-Means

Spark MLlib的二分K-Means算法是一种基于K-Means的聚类算法,它通过不断将一个聚类划分成两个子聚类,直到达到用户定义的K值为止。相对于传统的K-Means算法,二分K-Means的结果更可靠,但需要更多的计算资源。

下面我们来讲解一下如何使用Spark MLlib进行二分K-Means聚类,以及如何编写一个完整可运行的Java程序。

首先,我们需要准备数据。这里我们使用UCI Machine Learning Repository中的Iris数据集,该数据集包含150个样本,每个样本有4个特征,共分为3类。

代码如下:

SparkConf conf = new SparkConf().setAppName("BisectingKMeansExample").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(jsc);

// 加载数据
JavaRDD<String> data = jsc.textFile("iris.data");
JavaRDD<Vector> parsedData = data.map(s ->
String[] sarray = s.split(",");
double[] values = new double[sarray.length - 1];
for (int i = 0; i < sarray.length - 1; i++)
values[i] = Double.parseDouble(sarray[i]);

return Vectors.dense(values);
);
parsedData.cache();

接着,我们可以使用BisectingKMeans类来进行聚类。需要注意的是,与K-Means不同,二分K-Means的K值并不是作为参数传递给算法,而是通过分割聚类来逐渐逼近目标K值。

代码如下:

// 建立模型
BisectingKMeans bkm = new BisectingKMeans()
.setK(3)
.setMaxIterations(20);

BisectingKMeansModel model = bkm.run(parsedData.rdd());

最后,我们可以使用model.predict()方法来对新数据进行聚类预测,并使用model.clusterCenters()方法来获取聚类中心点。

代码如下:

// 使用模型进行预测
JavaRDD<Integer> predictedCluster = model.predict(parsedData);
JavaRDD<String> predictedData = data.zip(predictedCluster).map(tuple2 -> tuple2._1() + "," + tuple2._2());

// 打印聚类结果
List<String> predictedDataList = predictedData.collect();
for (String predictedDatum : predictedDataList)
System.out.println(predictedDatum);


// 获取聚类中心点
Vector[] clusterCenters = model.clusterCenters();
for (Vector clusterCenter : clusterCenters)
System.out.println(clusterCenter);

输出聚类结果。

// 输出聚类结果
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center : centers)
System.out.println(center);

Dataset<Row> transformed = model.transform(data);
System.out.println("Cluster Assignments:");
transformed.show();

其中,​​model.clusterCenters()​​​用于获取每个簇的中心点,​​model.transform(data)​​​用于将数据集中的每个点分配到最近的簇中,并生成新的DataFrame,新的DataFrame包含原始数据和预测的簇编号。我们可以通过调用​​show()​​方法来查看聚类结果。

完整的Java程序如下:

import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class BisectingKMeansDemo
public static void main(String[] args)
// 创建SparkSession
SparkSession spark = SparkSession.builder().appName("BisectingKMeansDemo").master("local[*]").getOrCreate();

// 加载数据
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// 训练模型
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(data);

// 输出聚类结果
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center : centers)
System.out.println(center);

Dataset<Row> transformed = model.transform(data);
System.out.println("Cluster Assignments:");
transformed.show();

// 停止SparkSession
spark.stop();

4.1.3 高斯混合模型

Spark MLlib中的高斯混合模型(Gaussian Mixture Model,简称GMM)是一种无监督学习算法,用于聚类和密度估计。它通过将每个样本分配到多个高斯分布中的一个来模拟数据分布。GMM的一个重要应用是图像分割,其中每个聚类对应于图像中的一个区域。

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class GMMExample
public static void main(String[] args)

//创建SparkConf和JavaSparkContext
SparkConf conf = new SparkConf().setAppName("GaussianMixtureDemo").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(jsc);

//读取数据
JavaRDD<String> data = jsc.textFile("data/gmm_data.txt");

//将数据转换为DataFrame
JavaRDD<Row> parsedData = data.map(s ->
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);

return RowFactory.create(values);
);
StructType schema = new StructType(new StructField[]
new StructField("features", new VectorUDT(), false, Metadata.empty())
);
Dataset<Row> dataset = sqlContext.createDataFrame(parsedData, schema);

//设置GaussianMixture模型参数
int k = 3;
GaussianMixture gaussianMixture = new GaussianMixture().setK(k).setSeed(1234L);
gaussianMixture.setMaxIter(10);
gaussianMixture.setTol(0.01);

//训练GaussianMixture模型
GaussianMixtureModel model = gaussianMixture.fit(dataset);

//输出每个高斯分布的权重、均值和协方差矩阵
for (int i = 0; i < k; i++)
System.out.printf("weight=%f\\nmu=%s\\ncov=\\n%s\\n",
model.weights()[i], Arrays.toString(model.gaussians()[i].mean().toArray()),
model.gaussians()[i].cov().toString());


//关闭JavaSparkContext
jsc.stop();

我们首先创建了一个SparkConf和JavaSparkContext对象,然后读取了数据。数据集是一个二维数据集,每一行代表一个样本,包含两个特征。

然后,我们将数据转换为DataFrame格式,并设置了GaussianMixture模型的参数。在这个例子中,我们设置了高斯混合模型的高斯分布数量为3,迭代次数为10,收敛阈值为0.01。

接着,我们使用GaussianMixture模型拟合数据,得到了训练后的GaussianMixtureModel对象。我们输出了每个高斯分布的权重、均值和协方差矩阵。

4.2 降维

4.2.1 主成分分析

Spark MLlib中的主成分分析(PCA)是一种降维技术,可用于将高维数据集转换为低维数据集,同时保留最重要的特征。PCA通常用于数据压缩和可视化,可以提高机器学习模型的效率和准确性。在本篇回答中,我们将讲解PCA的基本概念和Spark MLlib中的实现方法,并提供一个完整的可运行的Java示例。

PCA的基本概念 PCA是一种无监督学习算法,它将高维数据映射到低维数据,同时保留数据的最大方差。假设我们有一个m维的数据集,我们想将其降维到k维,那么PCA的基本步骤如下:

  1. 将数据集进行标准化,使其每个特征的平均值为0,方差为1。
  2. 计算数据集的协方差矩阵。
  3. 对协方差矩阵进行特征值分解。
  4. 选择k个最大特征值对应的特征向量,将它们作为主成分,组成投影矩阵。
  5. 将数据集乘以投影矩阵,得到降维后的数据集。

Spark MLlib中的PCA实现 Spark MLlib中的PCA实现使用了分布式计算技术,可以处理大规模数据集。下面是Spark MLlib中PCA的基本用法:

  1. 导入必要的类和方法。
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
  1. 创建SparkSession对象和数据集。
SparkSession spark = SparkSession.builder()
.appName("PCAExample")
.master("local[*]")
.getOrCreate();

// 创建数据集
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
  1. 将特征向量组装为向量列。
VectorAssembler assembler = new VectorAssembler()
.setInputCols(data.columns())
.setOutputCol("features");

Dataset<Row> assembledData = assembler.transform(data).select("features");
  1. 训练PCA模型。
PCAModel pcaModel = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(2)
.fit(assembledData);
  1. 使用PCA模型进行数据转换。
Dataset<Row> result = pcaModel.transform(assembledData).select("pcaFeatures");
result.show();

下面是一个相对完整的 Java 示例代码,用于演示如何使用 Spark MLlib 进行主成分分析:

import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class PCADemo
public static void main(String[] args)

SparkSession spark = SparkSession.builder()
.appName("PCADemo")
.master("local[*]")
.getOrCreate();

// 读取数据集
Dataset<Row> irisData = spark.read().format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("iris.csv");

// 将特征列合并为一个向量列
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]"sepal_length", "sepal_width", "petal_length", "petal_width")
.setOutputCol("features");
Dataset<Row> irisDataWithFeatures = assembler.transform(irisData);

// 创建PCA模型
PCAModel pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(2)
.fit(irisDataWithFeatures);

// 对数据集进行PCA变换
Dataset<Row> transformedData = pca.transform(irisDataWithFeatures)
.select("pcaFeatures");

// 输出每个主成分解释的方差比例
Vector explainedVariance = pca.explainedVariance();
for (int i = 0; i < explainedVariance.size(); i++)
double ratio = explainedVariance.apply(i);
System.out.println("PC" + (i + 1) + " explains " + ratio * 100 + "% variance.");


// 输出降维后的数据
transformedData.show();

spark.stop();


在这个示例中,我们使用了鸢尾花数据集,将其四个特征列合并为一个向量列,并将其输入到 ​​PCA​​​ 模型中进行主成分分析。通过调用 ​​fit()​​​ 方法训练模型,然后使用 ​​transform()​​ 方法对数据集进行PCA变换,生成降维后的数据集。在输出中,我们还打印了每个主成分解释的方差比例。

4.2.2 特征选择

Spark MLlib提供了多种特征选择算法,其中最常用的是基于卡方检验的特征选择方法。下面我们就以基于卡方检验的特征选择算法为例,讲解Spark MLlib中的特征选择。

基于卡方检验的特征选择算法主要是通过计算特征和标签之间的相关性,来确定每个特征对于分类任务的重要性,进而对特征进行选择。在Spark MLlib中,我们可以使用ChiSqSelector来实现基于卡方检验的特征选择。ChiSqSelector会对每个特征进行卡方检验,然后根据设定的阈值来确定每个特征的重要性,最后将重要性高于阈值的特征选出来。选中的特征将被用于后续的建模任务。

下面是一个基于卡方检验的特征选择的Spark MLlib Java示例代码,该代码基于Iris数据集进行特征选择,选出最重要的两个特征:

import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class PCADemo
public static void main(String[] args)
// 创建SparkSession
SparkSession spark = SparkSession.builder()
.appName("ChiSqSelectorDemo")
.master("local[*]")
.getOrCreate();

// 读取数据
Dataset<Row> data = spark.read().format("libsvm").load("data/sample_libsvm_data.txt");

// 合并特征
VectorAssembler assembler = new VectorAssembler()
.setInputCols(data.columns())
.setOutputCol("features");
Dataset<Row> features = assembler.transform(data).select("label", "features");

// 创建卡方选择器
ChiSqSelector selector = new ChiSqSelector()
.setNumTopFeatures(2) // 选取卡方值最大的两个特征
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures");

// 训练模型
ChiSqSelectorModel model = selector.fit(features);

// 应用模型
Dataset<Row> result = model.transform(features);

// 输出结果
result.show();

// 关闭SparkSession
spark.stop();


在这个示例中,我们首先使用​​libsvm​​​格式读取样本数据,并将其转换为​​Dataset<Row>​​​类型。接下来,我们使用​​VectorAssembler​​​将所有特征列合并为一列,命名为​​features​​​。然后,我们创建一个​​ChiSqSelector​​​对象,将​​features​​​列和​​label​​​列设置为其输入,并设置​​numTopFeatures​​​参数为2,即选取卡方值最大的两个特征。最后,我们使用​​fit​​​方法训练模型,并使用​​transform​​方法将特征集转换为卡方值最大的特征集。

需要注意的是,卡方选择器只能处理非负特征。如果数据集中包含负值特征,需要先将其转换为非负特征。

五、模型评估和调优

5.1 交叉验证

交叉验证是一种常用的模型评估方法,通过将数据集划分为若干个子集,然后用其中一个子集作为验证集,其他子集作为训练集,多次训练模型并进行验证,最终得到模型的平均性能。

在 Spark MLlib 中,提供了两种交叉验证方法:K-Fold 交叉验证和随机划分交叉验证。

K-Fold 交叉验证是将数据集划分为 K 个互不相交的子集,每个子集均被用作验证集一次,其余的 K-1 个子集被用作训练集,最终得到 K 个模型的平均性能。

随机划分交叉验证是将数据集随机划分为训练集和测试集,其中训练集用于训练模型,测试集用于评估模型性能。

下面,我们来看一个 Spark MLlib 的交叉验证的示例代码。

// 加载数据集
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// 构建逻辑回归模型
LogisticRegression lr = new LogisticRegression();

// 设置参数
ParamGridBuilder paramGridBuilder = new ParamGridBuilder()
.addGrid(lr.regParam(), new double[]0.1, 0.01)
.addGrid(lr.elasticNetParam(), new double[]0.0, 0.5, 1.0)
.addGrid(lr.maxIter(), new int[]10, 100)
.addGrid(lr.threshold(), new double[]0.45, 0.5, 0.55);

// 构建交叉验证器
CrossValidator crossValidator = new CrossValidator()
.setEstimator(lr)
.setEstimatorParamMaps(paramGridBuilder.build())
.setEvaluator(new BinaryClassificationEvaluator());

// 运行交叉验证
CrossValidatorModel cvModel = crossValidator.fit(data);

// 输出最佳模型参数
System.out.println(cvModel.bestModel().extractParamMap());

上面的代码中,我们首先加载了一个数据集,然后构建了一个逻辑回归模型。接着,我们使用 ParamGridBuilder 构建了一个参数网格,其中包含了多个参数组合。接着,我们使用 CrossValidator 构建了一个交叉验证器,并设置了模型、参数网格和评估器。最后,我们使用 fit 方法运行交叉验证,并输出最佳模型参数。

需要注意的是,交叉验证是一种非常耗时的操作,因此在大规模数据集和复杂模型中,需要适当减少交叉验证的次数和参数组合数,以提高效率。

5.2 参数调优

对于模型评估和调优,除了交叉验证外,另一种常用的方法是参数调优。在机器学习中,模型的性能很大程度上取决于所使用的参数。因此,通过调整参数,可以提高模型的性能。参数调优的目标是找到最佳参数组合以最大程度地提高模型的性能。

在Spark MLlib中,可以使用ParamGridBuilder和CrossValidator类进行参数调优。ParamGridBuilder允许构建参数网格,CrossValidator则执行交叉验证和参数调优。

具体而言,我们可以使用ParamGridBuilder来创建不同参数组合的网格,例如:

ParamGridBuilder paramGrid = new ParamGridBuilder();
paramGrid.addGrid(lr.regParam(), new double[] 0.1, 0.01);
paramGrid.addGrid(lr.elasticNetParam(), new double[] 0.0, 0.5, 1.0);

这里我们创建了一个包含两个参数的网格,一个是regParam,一个是elasticNetParam。regParam控制L2正则化,elasticNetParam控制L1正则化和L2正则化的组合。我们将每个参数设置为不同的值,以便在训练模型时进行尝试。

然后,我们可以将这个参数网格传递给CrossValidator类,使用它来执行交叉验证和参数调优。例如:

CrossValidator crossValidator = new CrossValidator()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator())
.setEstimatorParamMaps(paramGrid.build())
.setNumFolds(3);

这里我们创建了一个CrossValidator实例,将LogisticRegression作为评估器,将RegressionEvaluator作为评估器评估模型性能。我们将ParamGridBuilder创建的参数网格传递给setEstimatorParamMaps方法,并将折叠数设置为3。最后,我们可以使用fit方法拟合模型,如下所示:

CrossValidatorModel cvModel = crossValidator.fit(data);

在这个例子中,我们使用CrossValidator来拟合一个LogisticRegression模型,并尝试不同的参数组合。通过交叉验证,我们可以找到最佳参数组合,以提高模型性能。

需要注意的是,参数调优可能会消耗大量的计算资源和时间。因此,通常需要选择合适的参数范围和较少的参数组合,以便更有效地进行调优。

六、模型部署

6.1 模型保存和加载

模型的保存和加载是将训练好的模型保存到磁盘上,以便后续可以直接加载使用。在Spark MLlib中,可以使用​​org.apache.spark.ml.PipelineModel​​类来保存和加载模型。

模型保存和加载的常见步骤如下:

  1. 通过Spark MLlib训练得到一个模型,例如线性回归模型、分类模型等。
  2. 将模型保存到磁盘上,例如本地磁盘或HDFS等。在保存时需要指定保

    以上是关于Java应用XIV使用Apache Spark MLlib构建机器学习模型下的主要内容,如果未能解决你的问题,请参考以下文章

    如何在现有的 Web 应用程序中使用 apache spark

    intellij 中 spark scala 应用程序中的线程“main”java.lang.NoClassDefFoundError:org/apache/spark/sql/catalyst/St

    找不到 Spark 和 Cassandra Java 应用程序异常提供程序 org.apache.hadoop.fs.s3.S3FileSystem

    使用java开发spark的wordcount程序

    使用 Spark 中的共享变量

    在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错