Java语言在Spark3.2.4集群中使用Spark MLlib库完成XGboost算法
Posted ^王晓明^
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java语言在Spark3.2.4集群中使用Spark MLlib库完成XGboost算法相关的知识,希望对你有一定的参考价值。
一、概述
XGBoost是一种基于决策树的集成学习算法,它在处理结构化数据方面表现优异。相比其他算法,XGBoost能够处理大量特征和样本,并且支持通过正则化控制模型的复杂度。XGBoost也可以自动进行特征选择并对缺失值进行处理。
二、代码实现步骤
1、导入相关库
import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.evaluation.RegressionEvaluator; import org.apache.spark.ml.feature.VectorAssembler; import org.apache.spark.ml.regression.GBTRegressionModel, GBTRegressor; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SparkSession;
2、加载数据
SparkSession spark = SparkSession.builder().appName("XGBoost").master("local[*]").getOrCreate();
DataFrame data = spark.read().option("header", "true").option("inferSchema", "true").csv("data.csv");
3、准备特征向量
String[] featureCols = data.columns(); featureCols = Arrays.copyOfRange(featureCols, 0, featureCols.length - 1); VectorAssembler assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features"); DataFrame inputData = assembler.transform(data).select("features", "output"); inputData.show(false);
4、划分训练集和测试集
double[] weights = 0.7, 0.3; DataFrame[] splitData = inputData.randomSplit(weights); DataFrame train = splitData[0]; DataFrame test = splitData[1];
5、定义XGBoost模型
GBTRegressor gbt = new GBTRegressor() .setLabelCol("output") .setFeaturesCol("features") .setMaxIter(100) .setStepSize(0.1) .setMaxDepth(6) .setLossType("squared") .setFeatureSubsetStrategy("auto");
6、构建管道
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]gbt);
7、训练模型
GBTRegressionModel model = (GBTRegressionModel) pipeline.fit(train).stages()[0];
8、进行预测并评估模型
DataFrame predictions = model.transform(test); predictions.show(false); RegressionEvaluator evaluator = new RegressionEvaluator() .setMetricName("rmse") .setLabelCol("output") .setPredictionCol("prediction"); double rmse = evaluator.evaluate(predictions); System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
以上就是Java语言中基于SparkML的XGBoost算法实现的示例代码。需要注意的是,这里使用了GBTRegressor作为XGBoost的实现方式,但是也可以使用其他实现方式,例如XGBoostRegressor或者XGBoostClassification。
三、完整代码
import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.evaluation.RegressionEvaluator; import org.apache.spark.ml.feature.VectorAssembler; import org.apache.spark.ml.regression.GBTRegressionModel, GBTRegressor; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SparkSession; import java.util.Arrays; public class XGBoostExample public static void main(String[] args) SparkSession spark = SparkSession.builder().appName("XGBoost").master("local[*]").getOrCreate(); // 加载数据 DataFrame data = spark.read().option("header", "true").option("inferSchema", "true").csv("data.csv"); data.printSchema(); data.show(false); // 准备特征向量 String[] featureCols = data.columns(); featureCols = Arrays.copyOfRange(featureCols, 0, featureCols.length - 1); VectorAssembler assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features"); DataFrame inputData = assembler.transform(data).select("features", "output"); inputData.show(false); // 划分训练集和测试集 double[] weights = 0.7, 0.3; DataFrame[] splitData = inputData.randomSplit(weights); DataFrame train = splitData[0]; DataFrame test = splitData[1]; // 定义XGBoost模型 GBTRegressor gbt = new GBTRegressor() .setLabelCol("output") .setFeaturesCol("features") .setMaxIter(100) .setStepSize(0.1) .setMaxDepth(6) .setLossType("squared") .setFeatureSubsetStrategy("auto"); // 构建管道 Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]gbt); // 训练模型 GBTRegressionModel model = (GBTRegressionModel) pipeline.fit(train).stages()[0]; // 进行预测并评估模型 DataFrame predictions = model.transform(test); predictions.show(false); RegressionEvaluator evaluator = new RegressionEvaluator() .setMetricName("rmse") .setLabelCol("output") .setPredictionCol("prediction"); double rmse = evaluator.evaluate(predictions); System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse); spark.stop();
Docker-Compose构建spark集群
0.前言
知道我的兄弟朋友们可能知道,我最近工作中,我们老大叫我利用Docker构建大数据开发环境。今天真的十分开心,我利用Docker-Compose构建出Kafka-Spark-MySQL的云平台大数据开发环境,并利用自己撰写的spark streaming程序实现了实时数据的处理。
本篇文章主要介绍利用Docker-Compose构建出Spark的运行环境,和前面的Kafka和MySQL相似,本次搭建的Spark集群还是采用Bitnami的镜像进行构建。
1.单节点的Spark的构建
下面是利用Docker-Compose构建的Spark的单节点,生产环境不建议这样配置。下面是单节点的配置文件
version: '2'
services:
spark:
image: docker.io/bitnami/spark:3.3
ports:
- '8080:8080'
environment:
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
2.Spark一主两从节点的构建
2.1Docker-Compose配置文件
下面是Spark一主两从的节点配置脚本
version: '2'
services:
spark:
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- '8080:8080'
spark-worker-1:
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
spark-worker-2:
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
2.2 环境配置参数
下面是Spark集群搭建的一些可用变量:
- SPARK_MODE:集群模式启动 Apache Spark。有效值:主值、辅助角色。默认值:master
- SPARK_MASTER_URL:工作人员可以找到主节点的 URL。仅当spark模式为辅助角色时才需要。默认值:spark://spark-master:7077
- SPARK_RPC_AUTHENTICATION_ENABLED:启用 RPC 身份验证。默认值:no
- SPARK_RPC_AUTHENTICATION_SECRET:用于 RPC 身份验证的密钥。无默认值。
- SPARK_RPC_ENCRYPTION_ENABLED:启用 RPC 加密。默认值:no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED:启用本地存储加密:默false
- SPARK_SSL_ENABLED:启用 SSL 配置。默认值:
- SPARK_SSL_KEY_PASSWORD:密钥存储中私钥的密码。无默认值。
- SPARK_SSL_KEYSTORE_FILE:密钥存储的位置。默认值:/opt/bitnami/spark/conf/certs/spark-keystore.jks.
- SPARK_SSL_KEYSTORE_PASSWORD:密钥存储的密码。无默认值
- SPARK_SSL_TRUSTSTORE_PASSWORD:信任存储的密码。无默认值。
- SPARK_SSL_TRUSTSTORE_FILE:密钥存储的位置。默认值:/opt/bitnami/spark/conf/certs/spark-truststore.jks.
- SPARK_SSL_NEED_CLIENT_AUTH:是否需要客户端身份验证。默认值:是
- SPARK_SSL_PROTOCOL:要使用的 TLS 协议。默认值:TLSv1.2
- SPARK_DAEMON_USER:容器以 root 身份启动时的 Apache Spark 系统用户。默认值:spark
- SPARK_DAEMON_GROUP:容器以 root 身份启动时,Apache Spark 系统组。默认值:spark
以上是关于Java语言在Spark3.2.4集群中使用Spark MLlib库完成XGboost算法的主要内容,如果未能解决你的问题,请参考以下文章