Java语言在Spark3.2.4集群中使用Spark MLlib库完成XGboost算法

Posted ^王晓明^

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java语言在Spark3.2.4集群中使用Spark MLlib库完成XGboost算法相关的知识,希望对你有一定的参考价值。

一、概述

XGBoost是一种基于决策树的集成学习算法,它在处理结构化数据方面表现优异。相比其他算法,XGBoost能够处理大量特征和样本,并且支持通过正则化控制模型的复杂度。XGBoost也可以自动进行特征选择并对缺失值进行处理。

二、代码实现步骤

1、导入相关库

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.GBTRegressionModel, GBTRegressor;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SparkSession;

2、加载数据

SparkSession spark = SparkSession.builder().appName("XGBoost").master("local[*]").getOrCreate();
DataFrame data = spark.read().option("header", "true").option("inferSchema", "true").csv("data.csv");

3、准备特征向量

String[] featureCols = data.columns();
featureCols = Arrays.copyOfRange(featureCols, 0, featureCols.length - 1);
VectorAssembler assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features");
DataFrame inputData = assembler.transform(data).select("features", "output");
inputData.show(false);

4、划分训练集和测试集

double[] weights = 0.7, 0.3;
DataFrame[] splitData = inputData.randomSplit(weights);
DataFrame train = splitData[0];
DataFrame test = splitData[1];

5、定义XGBoost模型

GBTRegressor gbt = new GBTRegressor()
    .setLabelCol("output")
    .setFeaturesCol("features")
    .setMaxIter(100)
    .setStepSize(0.1)
    .setMaxDepth(6)
    .setLossType("squared")
    .setFeatureSubsetStrategy("auto");

6、构建管道

Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]gbt);

7、训练模型

GBTRegressionModel model = (GBTRegressionModel) pipeline.fit(train).stages()[0];

8、进行预测并评估模型

DataFrame predictions = model.transform(test);
predictions.show(false);

RegressionEvaluator evaluator = new RegressionEvaluator()
    .setMetricName("rmse")
    .setLabelCol("output")
    .setPredictionCol("prediction");

double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

以上就是Java语言中基于SparkML的XGBoost算法实现的示例代码。需要注意的是,这里使用了GBTRegressor作为XGBoost的实现方式,但是也可以使用其他实现方式,例如XGBoostRegressor或者XGBoostClassification。

三、完整代码

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.GBTRegressionModel, GBTRegressor;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;

public class XGBoostExample 

    public static void main(String[] args) 
        SparkSession spark = SparkSession.builder().appName("XGBoost").master("local[*]").getOrCreate();

        // 加载数据
        DataFrame data = spark.read().option("header", "true").option("inferSchema", "true").csv("data.csv");
        data.printSchema();
        data.show(false);

        // 准备特征向量
        String[] featureCols = data.columns();
        featureCols = Arrays.copyOfRange(featureCols, 0, featureCols.length - 1);
        VectorAssembler assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features");
        DataFrame inputData = assembler.transform(data).select("features", "output");
        inputData.show(false);

        // 划分训练集和测试集
        double[] weights = 0.7, 0.3;
        DataFrame[] splitData = inputData.randomSplit(weights);
        DataFrame train = splitData[0];
        DataFrame test = splitData[1];

        // 定义XGBoost模型
        GBTRegressor gbt = new GBTRegressor()
                .setLabelCol("output")
                .setFeaturesCol("features")
                .setMaxIter(100)
                .setStepSize(0.1)
                .setMaxDepth(6)
                .setLossType("squared")
                .setFeatureSubsetStrategy("auto");

        // 构建管道
        Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]gbt);

        // 训练模型
        GBTRegressionModel model = (GBTRegressionModel) pipeline.fit(train).stages()[0];

        // 进行预测并评估模型
        DataFrame predictions = model.transform(test);
        predictions.show(false);

        RegressionEvaluator evaluator = new RegressionEvaluator()
                .setMetricName("rmse")
                .setLabelCol("output")
                .setPredictionCol("prediction");

        double rmse = evaluator.evaluate(predictions);
        System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

        spark.stop();
    

在运行代码之前需要将数据文件data.csv放置到程序所在目录下,以便加载数据。另外,需要将代码中的相关路径和参数按照实际情况进行修改。 

Docker-Compose构建spark集群

0.前言

知道我的兄弟朋友们可能知道,我最近工作中,我们老大叫我利用Docker构建大数据开发环境。今天真的十分开心,我利用Docker-Compose构建出Kafka-Spark-MySQL的云平台大数据开发环境,并利用自己撰写的spark streaming程序实现了实时数据的处理。
本篇文章主要介绍利用Docker-Compose构建出Spark的运行环境,和前面的Kafka和MySQL相似,本次搭建的Spark集群还是采用Bitnami的镜像进行构建。

1.单节点的Spark的构建

下面是利用Docker-Compose构建的Spark的单节点,生产环境不建议这样配置。下面是单节点的配置文件

version: '2'

services:
  spark:
    image: docker.io/bitnami/spark:3.3
    ports:
      - '8080:8080'
    environment:
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1

2.Spark一主两从节点的构建

2.1Docker-Compose配置文件

下面是Spark一主两从的节点配置脚本

version: '2'

services:
  spark:
    image: docker.io/bitnami/spark:3.3
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    ports:
      - '8080:8080'
  spark-worker-1:
    image: docker.io/bitnami/spark:3.3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
  spark-worker-2:
    image: docker.io/bitnami/spark:3.3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no

2.2 环境配置参数

下面是Spark集群搭建的一些可用变量:

  • SPARK_MODE:集群模式启动 Apache Spark。有效值:主值、辅助角色。默认值:master
  • SPARK_MASTER_URL:工作人员可以找到主节点的 URL。仅当spark模式为辅助角色时才需要。默认值:spark://spark-master:7077
  • SPARK_RPC_AUTHENTICATION_ENABLED:启用 RPC 身份验证。默认值:no
  • SPARK_RPC_AUTHENTICATION_SECRET:用于 RPC 身份验证的密钥。无默认值。
  • SPARK_RPC_ENCRYPTION_ENABLED:启用 RPC 加密。默认值:no
  • SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED:启用本地存储加密:默false
  • SPARK_SSL_ENABLED:启用 SSL 配置。默认值:
  • SPARK_SSL_KEY_PASSWORD:密钥存储中私钥的密码。无默认值。
  • SPARK_SSL_KEYSTORE_FILE:密钥存储的位置。默认值:/opt/bitnami/spark/conf/certs/spark-keystore.jks.
  • SPARK_SSL_KEYSTORE_PASSWORD:密钥存储的密码。无默认值
  • SPARK_SSL_TRUSTSTORE_PASSWORD:信任存储的密码。无默认值。
  • SPARK_SSL_TRUSTSTORE_FILE:密钥存储的位置。默认值:/opt/bitnami/spark/conf/certs/spark-truststore.jks.
  • SPARK_SSL_NEED_CLIENT_AUTH:是否需要客户端身份验证。默认值:是
  • SPARK_SSL_PROTOCOL:要使用的 TLS 协议。默认值:TLSv1.2
  • SPARK_DAEMON_USER:容器以 root 身份启动时的 Apache Spark 系统用户。默认值:spark
  • SPARK_DAEMON_GROUP:容器以 root 身份启动时,Apache Spark 系统组。默认值:spark

以上是关于Java语言在Spark3.2.4集群中使用Spark MLlib库完成XGboost算法的主要内容,如果未能解决你的问题,请参考以下文章

如何保护 Angular 2 SPA 免受 XSS 攻击?

SPA 应该在哪里保存 OAuth 2.0 访问令牌?

Java目前主流框架有哪些?

java操作 redis集群

郑州-第七十四期前后端分离(SPA)与不分离(JSP) 工作流程分别是怎样的?

asp.net core中托管SPA应用