机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析

Posted 数据挖掘之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析相关的知识,希望对你有一定的参考价值。

作者 | 潘正兵

如今,人们出行的数字化程度越来越高,打车早已成为出行习惯。对运营公司来说,乘客打车产生的数据其实是很有价值的,可以做一系列分析辅助运营和决策。本案例就是利用uber乘客打车时产生的数据进行处理、建模和实战应用。目标是实现载客位置聚类分析,直观的展现出随着时间的变化,美国部分地区的uber乘客打车热力图以及热力图的聚类中心位置,聚类中心的确定可以给司机载客选址作为参考以及uber对于资源的投放策略的选取和其他分析等。


前面的案例中我们以数据抽取和处理以及建模为重点介绍了spark ml实现线性回归(监督学习)的实现。本案例为end2end机器学习项目,即:从数据ETL、模型训练、部署、预测、数据进行展现。侧重举例说明非监督学习在工程项目的全流程实现。实现的方式主要基于spark ml实现k-means聚类以及spark streaming +kafka对处理实时数据流。 

  
    
    
  
//业务数据样本:
2014-08-01 00:00:00,40.729,-73.9422,B02598
2014-08-01 00:00:00,40.7476,-73.9871,B02598
2014-08-01 00:00:00,40.7424,-74.0044,B02598
2014-08-01 00:00:00,40.751,-73.9869,B02598
2014-08-01 00:00:00,40.7406,-73.9902,B02598
2014-08-01 00:00:00,40.6994,-73.9591,B02617
//字段说明:
Date/Time | 乘客搭乘Uber的时间
Lat | 乘客搭乘Uber所在地的纬度
Lon | 乘客搭乘Uber所在地的经度
Base | TLC base公司代码


1

实现架构


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


1> 离线处理:数据(uber.cvs)放在hdfs上,作为模型训练的数据源;

2> 离线处理:启动spark BatchProcesing,加载数据、模型训练/评估、模型调优、模型保存;

3> 实时处理:启动StreamingProcessing服务,通过Spark streaming 加载模型,启动模型预测服务,开启kafka consumer实时监听业务端(即uber服务端)发送到topic1的增量数据;

4> 实时处理:模型算法对增量数据实时进行预测分析,给每一条uber乘客上车的位置进行聚类,分配聚类中心,将预测数据发送到topic2,用于后续业务处理;

5> Web Serving 服务消费topic2并解析,实时将预测聚类数据渲染到google地图上;

6> 联调和优化


2  

离线处理

从业务数据加载到模型保存


1 项目结构

机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


2 离线处理module的依赖

  
    
    
  
<dependencies>
    <!-- Spark dependencies -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
</dependencies>


3 数据准备
先在hdfs上创建存放csv文件和模型文件的目录:

hadoopfs -mkdir /user/hadoop-twq/uber-cluster
hadoopfs -mkdir /user/hadoop-twq/uber-cluster/saveModel


再将本地uber.csv文件上传到hdfs:

hadoopfs -copyFromLocal uber.csv /user/hadoop-twq/uber-cluster/uber.csv

hdfs webUi上查看下数据在哪些节点上,使用的数据没有超过一个block大小,所以只在一个节点上。


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


4 加载数据,生成原始DF


//根据字段情况增加schema:
val schema = StructType(Array(
  StructField("dt", TimestampType, true),
  StructField("lat", DoubleType, true),
  StructField("log", DoubleType, true),
  StructField("base", StringType, true)
))
val path = "hdfs://master:9999/user/hadoop-twq/uber-cluster/uber.csv"
val rawDF = spark.read.schema(schema).csv(path)
rawDF.printSchema()
rawDF.show()


运行,查看schema和数据情况:


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析

5 Dataframe转特征向量


 //先将rawDF缓存起来
rawDF.cache()
//将生成的特征向量装在一个列中
val vectorAssembler = new VectorAssembler()
  .setInputCols(Array("lat""log"))
  .setOutputCol("features")


6 应用KMeans算法,迭代调参


//先将rawDF缓存起来
//定义一个KMeans  注意包路径:org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans()
   .setFeaturesCol("features"//设置特征向量列
   .setK(20//设置K值 根据经验 或者根据肘部法则选取,不断调整大小调优
   .setMaxIter(20//迭代次数 越多耗时越长 一般越精准

//将vectorAssembler和算法一起放在一个Pipeline中,自动顺序执行
val pipeline = new Pipeline().setStages(Array(vectorAssembler, kmeans))

//训练集和测试集 定为7:3
val Array(trainingData, testingData) = rawDF.randomSplit(Array(0.70.3))


7 Spark Kmeans实现源码案例浅析


这里有必要分析和参考一下Spark Kmeans的源码案例,案例文件为KMeansExample.scala,源码路径:
spark.example.src.main.scala.org.apache.spark.examples.ml


如下图,源码加载的文件sample_kmeans_data.txt,数据共6行,前三行值均为0.几,后三行是9.几,可以看出,聚类中心点个数(K值)适合设置为2,所以在案例代码中,K值设置的是2。


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


//根据字段情况增加schema:
val dataset =
  spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains a k-means model.
val kmeans = new KMeans()
  .setK(2//设置K值为2
  .setSeed(1L//设置Seed值,用于随机初始化K-Means聚心点
val model = kmeans.fit(dataset)



关于K值和迭代次数的默认值,在源码中有定义


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


对于数据量大,或者无法直观判断k值大小的情况,可以采用肘部法则粗略设置K值。


肘部法则:使用各个簇内的样本点到所在簇聚心的距离平方和(SSE)作为性能度量,越小则说明各个类簇越收敛。当然不是说SSE越小越好,因为一种极端情况时将所有的样本点均视作簇,这样的话SSE为0,但是显然达不到分类的效果。我们要做的就是在类簇数量与SSE之间寻求一个平衡点。肘部法则为我们提供了这样的方法。


肘部法则实现:指定一个i值,即可能的最大类簇数。然后将类簇数从1开始递增,一直到i,计算出i个SSE。根据数据的潜在模式,当设定的类簇数不断逼近真实类簇数时,SSE呈现快速下降态势,而当设定类簇数超过真实类簇数时,SSE也会继续下降,但下降会迅速趋于缓慢。通过画出K-SSE曲线,找出下降途中的拐点,即可较好的确定K值。如图,为SSE-簇数变化示意图:


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


8 模型训练


当算法参数调整好后,便可进行模型训练了


val pipelineMode = pipeline.fit(trainingData)


9 模型预测


//模型预测
val predictions = pipelineMode.transform(testingData)


//查看预测之后的dataframe里面的数据情况
predictions.show()


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


10 模型初步评估


//通过轮廓系数评估模型好坏
val clusteringEvaluator = new ClusteringEvaluator()
val silhouette = clusteringEvaluator.evaluate(predictions)
println("轮廓系数:" + silhouette)


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


目前这个轮廓系数比1小很多,还可以通过调整K值和迭代次数,以提高轮廓系数更接近1,或者用交叉验证方式调参。这里就不展开了。

关于轮廓系数,这里可以简单了解下其用于评估模型优劣的原理。

在KMeans中,定义:

a(i):i向量到同一簇内其他点不相似程度的平均值

b(i) :i向量到其他簇的平均不相似程度的最小值

如图所示定义轮廓系数值S(i)算法:


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


将所有点的轮廓系数求平均,就是该聚类结果总的轮廓系数。可见轮廓系数的值是介于 [-1,1] ,S(i)越趋近于1代表内聚度和分离度都相对较优,模型也就更优。


回到代码实现逻辑中,我们此时可以得到每个聚类中心的特征值了:


// 得到每一个聚类中心的特征值
val kMeansModel = pipelineMode.stages.last.asInstanceOf[KMeansModel]
println("最后的聚类中心点:")
kMeansModel.clusterCenters.foreach(println)


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


11 模型保存


// 保存模型
kMeansModel.write.overwrite()
  .save("hdfs://master:9999/user/hadoop-twq/uber-cluster/saveModel")


执行完成后,查看hdfs中的模型存储目录,如下,可以看到生成了data和metadata两个子目录

机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


其中data中存放的是一个parquet模型文件,包括聚类中心点数据,如图:


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


metadata中是模型的元数据,包括算法的类型,spark的版本,模型生成的时间,以及一些默认的配置参数等,如图所示:


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


3

实时处理,从业务源数据到模型效果测试


1 实时处理-输入输出


首先,我们需要明确实时处理的输出数据和输出数据类型,我们设计输出数据为json格式方便后面解析,json中包括了原始的业务数据,已经预测的聚类点编号,聚类点经纬度。

下面是实时处理module的依赖和数据准备


<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>com.twq</groupId>
    <artifactId>spark-kafka-connector</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>


//定义一个业务数据的case class 用于后面代码的解析
case class Uber(dt: String, lat: Double, log: Double, base: String)
//获取一行原始业务数据,按照逗号进行分割和解析,字段转换成对应字段类型
def parseUber(line: String): Uber 
= {
  val tmp = line.split(",")
  Uber(tmp(0), tmp(1).toDouble, tmp(2).toDouble, tmp(3))


2  实时处理-kmeans模型加载


接下来开始编写spark streaming程序,所有的spark应用的入口都是从SparkSession开始,这里不赘述。初始化sc后,初始化StreamingContext,并启动


// 初始化 StreamingContext,每隔2秒处理一次(batch )
val sc = new StreamingContext(spark.sparkContext, Seconds(2))
// 1、加载Kmeans模型数据
val kMeansModel = KMeansModel.load("data/savedmodel")
println("聚类中心点特征数据:")
kMeansModel.clusterCenters.foreach(println)
// 启动Streaming
println("APP Starting")
sc.start()
sc.awaitTermination()//spark streaming是一直运行的,直到中被中断


打印结果如下,说明加载模型正常


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


3 实时处理-direct模式消费kafka

加载模型后,接下来则是spark streaming消费kafka数据,则我们需要写一个inputDStreams, sparkStreaming集成kafka有两种模式,基于Receiver和Direct方式,后面我们主要通过Direct方式实现,原因是简单、且能保证数据不会丢失。代码中做了详细注释。


// 2. 使用direct方式消费kafka中的数据
//先定义kafka消费参数
val kafkaParams = Map[String, String](
  //comsumer broker
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "master:9092",
  //消费者groupID
  ConsumerConfig.GROUP_ID_CONFIG -> "RealtimeUberPredictor",
  //key的反序列化机制采用自带的StringDeserializer
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
    "org.apache.kafka.common.serialization.StringDeserializer",
  //从哪个offset开始消费消息
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  //value的反序列化机制采用自带的StringDeserializer
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
    "org.apache.kafka.common.serialization.StringDeserializer",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
  //消费者轮询有个time out时间,如果这段时间没有消息,则不去拉取数据  单位ms
  "spark.streaming.kafka.consumer.poll.ms" -> "10000"
)

// ConsumerStrategies的取值:
//    Assign 指定消费特定的Topic的Partition数据
//    Subscribe 订阅好几个topic的数据
//    SubscribePattern 采用正则表达式匹配topic的名称,如果匹配上的话则订阅该topic的数据
//注意:这里consumerStrategy的Subscribe方法需要指定key value 类型
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](Array("uber"), kafkaParams)

// LocationStrategies的取值:
//    PreferBrokers 如果Kafka的broker和Spark Streaming的Executor在同一台机器上的话则选择这个
//    PreferConsistent 一般都是使用这个,使用这种方式可以保证Kafka Topic的分区数据均匀分布在所有的Spark Streaming executors上
//    PreferFixed 将指定的Topic的Partition的数据放到指定的机器上进行处理

val inputDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
  sc, LocationStrategies.PreferConsistent, consumerStrategy)

//输入的Dstream只需要拿出value值即可
val rawDStream: DStream[String] = inputDStream.map(_.value())


4 实时处理-原始数据解析


上一步,我们得到的是一个String类型的DStream,接下来处理原生数据,也就是将业务数据:

2014-08-01 00:00:00,40.729,-73.9422,B02598

转换成:

{"dt":"2014-08-01 00:00:00""lat":40.729"log":-73.9422
"base":"B02598""cluster":1"clat":99.729"clog":-103.9422}


然后发往第二个kafka topic,这里两个json的转换,采用spark sql更容易些,代码如下:


// 3、处理接收到的消息
rawDStream.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    val sparkSession = SparkSession.builder()
      //这里是将上面的父sparkSession给到当前sparkSession
      .config(spark.sparkContext.getConf)
      .getOrCreate()

    import sparkSession.implicits._
    // 将原始的字符串数据转成Uber类型的数据,进而转成DataFrame
    val uberRDD: RDD[Uber] = rdd.map(parseUber)
    //RDD转DF需要将隐式转换导入,后面的操作基于df更便捷
    val uberDF = uberRDD.toDF
  }
}


5 实时处理-模型预测


得到df后,则可以通过模型进行预测,这也是要将处理后的RDD转成df的原因。


// 预测聚类中心
// 生成特征向量
val vectorAssembler = new VectorAssembler()
  .setInputCols(Array("lat""log"))
  .setOutputCol("features")
val featuresDF = vectorAssembler.transform(uberDF)
// 做预测
val predictions = kMeansModel.transform(featuresDF)
predictions.show()


6 实时处理-生成Json结果数据


val uberDFWithPrediction = predictions
  .select($"dt", $"lat", $"log", $"base", $"prediction".as("cid"))
case class ClusterCenter(cid: Integer, clat: Double, clog: Double)
val centers 
= kMeansModel.clusterCenters.zipWithIndex.map { case (center, index) =>
  ClusterCenter(index, center(0), center(1))
}
val centersDF = spark.sparkContext.parallelize(centers).toDF

val finalDF = uberDFWithPrediction
  .join(centersDF, Seq("cid"))//根据cid进行关联
  .orderBy($"dt")  //根据dt进行升序排

// 将结果数据转成Json类型的Dataset
val jsonDS: Dataset[String] = finalDF.toJSON


7 实时处理-json数据发往另一个kafka topic


这里有个问题,用SparkStreaming读kafka数据可以通过api实现,但sparkrdd数据如何写入kafka呢。这里是通过在gethub上下载spark-kafka-connector组件,里面有rdd.sendToKafkaAPI可以直接调用。其实现逻辑是,一个rdd包含若干个分区,每个分区分别启一个kafkaProducer,将该分区的数据发送到这个topic中。这里在源码中增加了一个类RDDKafkaWriter,用于将指定RDD数据写入Kafka中


// 在每一个RDD分区中执行如下操作:
rdd.foreachPartition { partition =>
  // 1、获取KafkaProducer实例
  val producer: KafkaProducer[K, V] = KafkaProducerFactory.getOrCreateProducer(producerConfig)

  // 2、打印debug级别的log信息
  // 获取当前Spark的task所在的Context(上下文)
  val context = TaskContext.get
  // 打印信息:Spark的task所在的分区id对应的Kafka的topic
  Logger.getLogger(getClass).debug(s"Send Spark partition: ${context.partitionId()} to Kafka topic : $topic")

  // 3、初始化Kafka发送消息的异常处理类
  val callback = new KafkaRDDSinkExceptionHandler

  // 4、将当前分区中的每一条数据转换成ProducerRecord,然后发向Kafka
  // 注意:需要处理异常
  partition.map(transformFunc).foreach { record =>
    callback.throwExceptionIfAny()
    producer.send(record, callback)


实现RDD发数据给kafka后,我们直接将该moduleinstall,并在我们实时处理model的pom中添加本地依赖,调用该方法。


<dependency>
    <groupId>com.twq</groupId>
    <artifactId>spark-kafka-connector</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>


最后封装下kafka topic 


val producerConfig = Map(
  "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
  "value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
  "bootstrap.servers" -> "master:9092",
  "batch.size" -> "10"
)
import com.twq.RDDKafkaWriter._
//这里可以直接sendToKafka 隐式转换腻害了
jsonDS.rdd.sendToKafka(producerConfig, "uberp",
  (line: String) => new ProducerRecord[Any, String]("uberp", line))


4

Web Serving,服务端和客户端实现


上面我们将数据以json格式实时发送到topic:uberp中,接下来我们需要实现web server,实现将uberp中数据实时发送到客户端浏览器,绘制热力图。我们采用Vert.x来实现对实时数据的处理。Vert.x是一个事件驱动的框架,最大的特点就在于异步(底层基于Netty),通过事件循环(EventLoop)来调起存储在异步任务队列(CallBackQueue)中的任务,大大降低了传统阻塞模型中线程对于操作系统的开销,所以相比spring同步阻塞来说,性能会更好。不过Vert.x学习起来有点难。

于是这里的前后端的数据交互,可以采用Vert.x的eventbus来实现:服务端消费kafka数据并推送到客户端浏览器上。eventbus这里作为消息通道,可以启动一个server,该server通过Vert.x实现,跑在JVM上,server创建一个vertx实例,通过调用vertx的相应API,实时的将topic数据封装并推送到eventbus中,客户端采用SockJS实时的从eventbus中取数据并绘制热力图进行展现。所以我们需要写对应的服务端和客户端代码实现以上功能。

server端的主要实现:

if (args.length != 2) {
            throw new IllegalArgumentException("Must have the HtttPort and Topic :  8080 uberp  ");
        }
        httpPort = Integer.parseInt(args[0]);
        uberTopic = args[1];
        // 1、 初始化并启动Vert.x服务
        // 1.1 创建一个Vertx实例,用于访问Vert.x的核心API
        Vertx vertx = Vertx.vertx();
        // 1.2 创建一个Router对象,用于将请求绑定到特定的处理类中
        Router router = Router.router(vertx);
        // 1.3 将请求url中uri是 /eventbus/* 的请求绑定到SockJSHandler处理类
        // SockJSHandler处理类其实就是event bus, 将消息从服务端推送到客户端
        BridgeOptions options = new BridgeOptions();
        // 设置将 "dashboard" 类型的消息推送到客户端
        options.setOutboundPermitted(Collections.singletonList(new PermittedOptions().setAddress("dashboard")));
        router.route("/eventbus/*").handler(SockJSHandler.create(vertx).bridge(options));
        // 1.4 其他的请求则绑定到StaticHandler处理类中
        //  StaticHandler 处理处理静态资源的请求 
        router.route().handler(StaticHandler.create().setCachingEnabled(false));

        // 1.5 创建一个http服务
        HttpServer httpServer = vertx.createHttpServer();
        // 注册请求处理类并且绑定监听指定端口
        httpServer.requestHandler(router::accept).listen(httpPort, ar -> {
            if (ar.succeeded()) {
                System.out.println("Http server started started on port " + httpPort);
            } else {
                ar.cause().printStackTrace();
            }
        });

        // 2. 启动 Kafka 消费者消费第二个topic中的数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers""master:9092");
        properties.setProperty("group.id""vertxdashboard");
        properties.setProperty("enable.auto.commit""true");
        properties.setProperty("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Arrays.asList(uberTopic));
        System.out.println("consume from Kafka Topic " + uberTopic + " publish to eventbus dashboard");
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(200);
            for (ConsumerRecord<String, String> record : records) {
                // 将消费的消息通过event bus push到客户端
                vertx.eventBus().publish("dashboard", record.value());
                System.out.println(record.value());
            }
        }

客户端index.html中主要是js代码,实现从服务端实时拉取数据、解析数据,调用google地图API,绘制热力图。js依赖如下:
<!-- 引入vertx依赖 -->
<script src="js/vertx/sockjs.min.js"></script>
<script src="js/vertx/vertxbus.js"></script>
<!-- 引入 Google Maps -->
<script async defer src="https://maps.googleapis.com/maps/api/js?key=YOU-KEY&libraries=visualization&callback=initMap"
      type="text/javascript"></script>

<!-- jquery -->
<script src="js/jquery/jquery.min.js"></script>
<script src="js/jquery/jquery-ui-widget.min.js"></script>
<script src="js/bootstrap/js/bootstrap.min.js"></script>

js依赖中YOU-KEY需要在google上申请。由于google在大陆被墙,所以,你懂的,这里花了不少时间搞通。具体的实现我就不说了,也不是重点,网上可以查下js如何实现以上内容。

5

项目联调与报错解决


1 启动kafka,创建上面所提的两个topic

~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic uber
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic uberp

2 模拟滴滴业务端,实时向uber topic发送乘客打车数据

写一个java类DataGenerator,实现从本地读取一个csv文件,文件中存有与我们模型训练相同数据格式的乘客打车数据。对数据一行一行读取,封装成json格式数据发送到topic,如下为主要代码:

public static void main(String[] args) throws IOException, InterruptedException {
    //判断有没有传参,main函数传参在run/Edit Conficonfiguration/Conficonfiguration/programme arguments里面 
    // 默认空格隔开多个参数
    //判断有没有传参数,传的分别是topic:uber和文件定位:data/history/uber.csv
    if (args.length != 2) {
        throw new IllegalArgumentException("Must have the topic and file parameter :  DataGenerator /apps/iot_stream:ecg /data/ecg.tsv ");
    }
    topic = args[0];
    fileName = args[1];
    System.out.println("Sending to topic " + topic);
    //该方法实现配置kafka broker信息 key和value的序列化器,很简单,不展开
    configureProducer();
    //本地读取文件
    File f = new File(fileName);
    FileReader fr = new FileReader(f);
    BufferedReader reader = new BufferedReader(fr);
    String line = reader.readLine();
    while (line != null) {
        //一行一行读取 并调方法发送给topic uber
        generateKVandPostMessage(line);
        line = reader.readLine();
    }
    //记得关闭producer
    producer.close();
    System.out.println("All done.");
    System.exit(1);
}
实现往uber topic发数据的方法代码:
private static void generateKVandPostMessage(String line) throws InterruptedException {
    System.out.println(line);
    //生成并发送记录到topic ,partition key  producer tickInterval都是在主类中定义的static对象
    ProducerRecord<String, String> rec = new ProducerRecord<>(topic, partition, key, line);
    producer.send(rec);
    //这里调节数据发送的速度
    Thread.sleep(tickInterval);
}

分别启动WebServer, RealtimeUberPredictor:

机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析

机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析

最后启动DataGenerator,也就是水源头开始开闸放水。数据实时的发送到uber topic中

机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


uberp topic中的预测数据也在实时产生。从生成的时间戳来看,相差了三秒钟,应该是预测模型进行预测以及前后的数据梳理一共花了三秒。(目前3秒有点长,应该还可以优化)

机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


然后直接在googl浏览器打开index.htm


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


3 数据可视化效果(gif)


机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析


这里红色编号为预测的聚类中心,旁边浅蓝和深红色为热度值地图展示。由于我的google API 没有绑定支付账户,被打了水印。


4 相关报错



1> 报错java客户端用户无法写文件到hdfs目录中




解决:修改hdfs-site.xml中设置dfs.permissions.enabled为false


2> RealtimeUberPredictor编译报错缺失spark-kafka-connector依赖


解决:先把spark-kafka-connector编译通过,install一下,再重新编译RealtimeUberPredictor 通过


3>  RealtimeUberPredictor运行报错:Scala compiler JARsnot found


解决:没有任何的Dstream操作,必须要添加action操作结束


4>  kafkatool能连接kafka集群,但是一个broker都没有



解决: 原因是zk为kafka创建的临时节点中没有写入broker的id信息(应该是有三个节点的[0,1,2]),用安全模式启动后,就会正常创建。
另外,非安全模式启动kafka时,有的节点broker会自动挂掉。


5> google地图无法展示热力图,报错



解决:js 集成google 地图 API 的key失效,需要翻墙重新在google地图上申请。


----------------完。 欢迎交流-------------------------









以上是关于机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析的主要内容,如果未能解决你的问题,请参考以下文章

ML机器学习|KMeans聚类算法|EM算法

4.Spark ML学习笔记—Spark ML决策树 (应用案例)随机森林GBDT算法ML 树模型参数详解 (本篇概念多)

用 PySpark ML 构建机器学习模型

PySpark ML——分布式机器学习库

如何在 Spark ML 中实现 Kmeans 评估器

Spark ML Pipeline简介