机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析
Posted 数据挖掘之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析相关的知识,希望对你有一定的参考价值。
如今,人们出行的数字化程度越来越高,打车早已成为出行习惯。对运营公司来说,乘客打车产生的数据其实是很有价值的,可以做一系列分析辅助运营和决策。本案例就是利用uber乘客打车时产生的数据进行处理、建模和实战应用。目标是实现载客位置聚类分析,直观的展现出随着时间的变化,美国部分地区的uber乘客打车热力图以及热力图的聚类中心位置,聚类中心的确定可以给司机载客选址作为参考以及uber对于资源的投放策略的选取和其他分析等。
//业务数据样本:
2014-08-01 00:00:00,40.729,-73.9422,B02598
2014-08-01 00:00:00,40.7476,-73.9871,B02598
2014-08-01 00:00:00,40.7424,-74.0044,B02598
2014-08-01 00:00:00,40.751,-73.9869,B02598
2014-08-01 00:00:00,40.7406,-73.9902,B02598
2014-08-01 00:00:00,40.6994,-73.9591,B02617
//字段说明:
Date/Time | 乘客搭乘Uber的时间
Lat | 乘客搭乘Uber所在地的纬度
Lon | 乘客搭乘Uber所在地的经度
Base | TLC base公司代码
1
实现架构
1> 离线处理:数据(uber.cvs)放在hdfs上,作为模型训练的数据源;
2> 离线处理:启动spark BatchProcesing,加载数据、模型训练/评估、模型调优、模型保存;
3> 实时处理:启动StreamingProcessing服务,通过Spark streaming 加载模型,启动模型预测服务,开启kafka consumer实时监听业务端(即uber服务端)发送到topic1的增量数据;
4> 实时处理:模型算法对增量数据实时进行预测分析,给每一条uber乘客上车的位置进行聚类,分配聚类中心,将预测数据发送到topic2,用于后续业务处理;
5> Web Serving 服务消费topic2并解析,实时将预测聚类数据渲染到google地图上;
6> 联调和优化。
2
离线处理
从业务数据加载到模型保存
1 项目结构
2 离线处理module的依赖
<dependencies>
<!-- Spark dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
hadoopfs -mkdir /user/hadoop-twq/uber-cluster
hadoopfs -mkdir /user/hadoop-twq/uber-cluster/saveModel
hadoopfs -copyFromLocal uber.csv /user/hadoop-twq/uber-cluster/uber.csv
hdfs webUi上查看下数据在哪些节点上,使用的数据没有超过一个block大小,所以只在一个节点上。
4 加载数据,生成原始DF
//根据字段情况增加schema:
val schema = StructType(Array(
StructField("dt", TimestampType, true),
StructField("lat", DoubleType, true),
StructField("log", DoubleType, true),
StructField("base", StringType, true)
))
val path = "hdfs://master:9999/user/hadoop-twq/uber-cluster/uber.csv"
val rawDF = spark.read.schema(schema).csv(path)
rawDF.printSchema()
rawDF.show()
运行,查看schema和数据情况:
5 Dataframe转特征向量
//先将rawDF缓存起来
rawDF.cache()
//将生成的特征向量装在一个列中
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("lat", "log"))
.setOutputCol("features")
6 应用KMeans算法,迭代调参
//先将rawDF缓存起来
//定义一个KMeans 注意包路径:org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans()
.setFeaturesCol("features") //设置特征向量列
.setK(20) //设置K值 根据经验 或者根据肘部法则选取,不断调整大小调优
.setMaxIter(20) //迭代次数 越多耗时越长 一般越精准
//将vectorAssembler和算法一起放在一个Pipeline中,自动顺序执行
val pipeline = new Pipeline().setStages(Array(vectorAssembler, kmeans))
//训练集和测试集 定为7:3
val Array(trainingData, testingData) = rawDF.randomSplit(Array(0.7, 0.3))
7 Spark Kmeans实现源码案例浅析
如下图,源码加载的文件sample_kmeans_data.txt,数据共6行,前三行值均为0.几,后三行是9.几,可以看出,聚类中心点个数(K值)适合设置为2,所以在案例代码中,K值设置的是2。
//根据字段情况增加schema:
val dataset =
spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains a k-means model.
val kmeans = new KMeans()
.setK(2) //设置K值为2
.setSeed(1L) //设置Seed值,用于随机初始化K-Means聚心点
val model = kmeans.fit(dataset)
关于K值和迭代次数的默认值,在源码中有定义
对于数据量大,或者无法直观判断k值大小的情况,可以采用肘部法则粗略设置K值。
8 模型训练
当算法参数调整好后,便可进行模型训练了
val pipelineMode = pipeline.fit(trainingData)
9 模型预测
//模型预测
val predictions = pipelineMode.transform(testingData)
//查看预测之后的dataframe里面的数据情况
predictions.show()
10 模型初步评估
//通过轮廓系数评估模型好坏
val clusteringEvaluator = new ClusteringEvaluator()
val silhouette = clusteringEvaluator.evaluate(predictions)
println("轮廓系数:" + silhouette)
关于轮廓系数,这里可以简单了解下其用于评估模型优劣的原理。
在KMeans中,定义:
a(i):i向量到同一簇内其他点不相似程度的平均值
b(i) :i向量到其他簇的平均不相似程度的最小值
如图所示定义轮廓系数值S(i)算法:
回到代码实现逻辑中,我们此时可以得到每个聚类中心的特征值了:
// 得到每一个聚类中心的特征值
val kMeansModel = pipelineMode.stages.last.asInstanceOf[KMeansModel]
println("最后的聚类中心点:")
kMeansModel.clusterCenters.foreach(println)
11 模型保存
// 保存模型
kMeansModel.write.overwrite()
.save("hdfs://master:9999/user/hadoop-twq/uber-cluster/saveModel")
其中data中存放的是一个parquet模型文件,包括聚类中心点数据,如图:
metadata中是模型的元数据,包括算法的类型,spark的版本,模型生成的时间,以及一些默认的配置参数等,如图所示:
3
实时处理,从业务源数据到模型效果测试
1 实时处理-输入输出
首先,我们需要明确实时处理的输出数据和输出数据类型,我们设计输出数据为json格式方便后面解析,json中包括了原始的业务数据,已经预测的聚类点编号,聚类点经纬度。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>com.twq</groupId>
<artifactId>spark-kafka-connector</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
//定义一个业务数据的case class 用于后面代码的解析
case class Uber(dt: String, lat: Double, log: Double, base: String)
//获取一行原始业务数据,按照逗号进行分割和解析,字段转换成对应字段类型
def parseUber(line: String): Uber = {
val tmp = line.split(",")
Uber(tmp(0), tmp(1).toDouble, tmp(2).toDouble, tmp(3))
2 实时处理-kmeans模型加载
// 初始化 StreamingContext,每隔2秒处理一次(batch )
val sc = new StreamingContext(spark.sparkContext, Seconds(2))
// 1、加载Kmeans模型数据
val kMeansModel = KMeansModel.load("data/savedmodel")
println("聚类中心点特征数据:")
kMeansModel.clusterCenters.foreach(println)// 启动Streaming
println("APP Starting")
sc.start()
sc.awaitTermination()//spark streaming是一直运行的,直到中被中断
打印结果如下,说明加载模型正常
// 2. 使用direct方式消费kafka中的数据
//先定义kafka消费参数
val kafkaParams = Map[String, String](
//comsumer broker
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "master:9092",
//消费者groupID
ConsumerConfig.GROUP_ID_CONFIG -> "RealtimeUberPredictor",
//key的反序列化机制采用自带的StringDeserializer
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
//从哪个offset开始消费消息
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
//value的反序列化机制采用自带的StringDeserializer
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
//消费者轮询有个time out时间,如果这段时间没有消息,则不去拉取数据 单位ms
"spark.streaming.kafka.consumer.poll.ms" -> "10000"
)
// ConsumerStrategies的取值:
// Assign 指定消费特定的Topic的Partition数据
// Subscribe 订阅好几个topic的数据
// SubscribePattern 采用正则表达式匹配topic的名称,如果匹配上的话则订阅该topic的数据
//注意:这里consumerStrategy的Subscribe方法需要指定key value 类型
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](Array("uber"), kafkaParams)
// LocationStrategies的取值:
// PreferBrokers 如果Kafka的broker和Spark Streaming的Executor在同一台机器上的话则选择这个
// PreferConsistent 一般都是使用这个,使用这种方式可以保证Kafka Topic的分区数据均匀分布在所有的Spark Streaming executors上
// PreferFixed 将指定的Topic的Partition的数据放到指定的机器上进行处理
val inputDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
sc, LocationStrategies.PreferConsistent, consumerStrategy)
//输入的Dstream只需要拿出value值即可
val rawDStream: DStream[String] = inputDStream.map(_.value())
4 实时处理-原始数据解析
上一步,我们得到的是一个String类型的DStream,接下来处理原生数据,也就是将业务数据:
2014-08-01 00:00:00,40.729,-73.9422,B02598
转换成:
{"dt":"2014-08-01 00:00:00", "lat":40.729, "log":-73.9422,
"base":"B02598", "cluster":1, "clat":99.729, "clog":-103.9422}
然后发往第二个kafka topic,这里两个json的转换,采用spark sql更容易些,代码如下:
// 3、处理接收到的消息
rawDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val sparkSession = SparkSession.builder()
//这里是将上面的父sparkSession给到当前sparkSession
.config(spark.sparkContext.getConf)
.getOrCreate()
import sparkSession.implicits._
// 将原始的字符串数据转成Uber类型的数据,进而转成DataFrame
val uberRDD: RDD[Uber] = rdd.map(parseUber)
//RDD转DF需要将隐式转换导入,后面的操作基于df更便捷
val uberDF = uberRDD.toDF
}
}
5 实时处理-模型预测
得到df后,则可以通过模型进行预测,这也是要将处理后的RDD转成df的原因。
// 预测聚类中心
// 生成特征向量
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("lat", "log"))
.setOutputCol("features")
val featuresDF = vectorAssembler.transform(uberDF)
// 做预测
val predictions = kMeansModel.transform(featuresDF)
predictions.show()
6 实时处理-生成Json结果数据
val uberDFWithPrediction = predictions
.select($"dt", $"lat", $"log", $"base", $"prediction".as("cid"))
case class ClusterCenter(cid: Integer, clat: Double, clog: Double)
val centers = kMeansModel.clusterCenters.zipWithIndex.map { case (center, index) =>
ClusterCenter(index, center(0), center(1))
}
val centersDF = spark.sparkContext.parallelize(centers).toDF
val finalDF = uberDFWithPrediction
.join(centersDF, Seq("cid"))//根据cid进行关联
.orderBy($"dt") //根据dt进行升序排
// 将结果数据转成Json类型的Dataset
val jsonDS: Dataset[String] = finalDF.toJSON
7 实时处理-json数据发往另一个kafka topic
// 在每一个RDD分区中执行如下操作:
rdd.foreachPartition { partition =>
// 1、获取KafkaProducer实例
val producer: KafkaProducer[K, V] = KafkaProducerFactory.getOrCreateProducer(producerConfig)
// 2、打印debug级别的log信息
// 获取当前Spark的task所在的Context(上下文)
val context = TaskContext.get
// 打印信息:Spark的task所在的分区id对应的Kafka的topic
Logger.getLogger(getClass).debug(s"Send Spark partition: ${context.partitionId()} to Kafka topic : $topic")
// 3、初始化Kafka发送消息的异常处理类
val callback = new KafkaRDDSinkExceptionHandler
// 4、将当前分区中的每一条数据转换成ProducerRecord,然后发向Kafka
// 注意:需要处理异常
partition.map(transformFunc).foreach { record =>
callback.throwExceptionIfAny()
producer.send(record, callback)
实现RDD发数据给kafka后,我们直接将该moduleinstall,并在我们实时处理model的pom中添加本地依赖,调用该方法。
<dependency>
<groupId>com.twq</groupId>
<artifactId>spark-kafka-connector</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
最后封装下kafka topic
val producerConfig = Map(
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"bootstrap.servers" -> "master:9092",
"batch.size" -> "10"
)
import com.twq.RDDKafkaWriter._
//这里可以直接sendToKafka 隐式转换腻害了
jsonDS.rdd.sendToKafka(producerConfig, "uberp",
(line: String) => new ProducerRecord[Any, String]("uberp", line))
4
Web Serving,服务端和客户端实现
if (args.length != 2) {
throw new IllegalArgumentException("Must have the HtttPort and Topic : 8080 uberp ");
}
httpPort = Integer.parseInt(args[0]);
uberTopic = args[1];
// 1、 初始化并启动Vert.x服务
// 1.1 创建一个Vertx实例,用于访问Vert.x的核心API
Vertx vertx = Vertx.vertx();
// 1.2 创建一个Router对象,用于将请求绑定到特定的处理类中
Router router = Router.router(vertx);
// 1.3 将请求url中uri是 /eventbus/* 的请求绑定到SockJSHandler处理类
// SockJSHandler处理类其实就是event bus, 将消息从服务端推送到客户端
BridgeOptions options = new BridgeOptions();
// 设置将 "dashboard" 类型的消息推送到客户端
options.setOutboundPermitted(Collections.singletonList(new PermittedOptions().setAddress("dashboard")));
router.route("/eventbus/*").handler(SockJSHandler.create(vertx).bridge(options));
// 1.4 其他的请求则绑定到StaticHandler处理类中
// StaticHandler 处理处理静态资源的请求
router.route().handler(StaticHandler.create().setCachingEnabled(false));
// 1.5 创建一个http服务
HttpServer httpServer = vertx.createHttpServer();
// 注册请求处理类并且绑定监听指定端口
httpServer.requestHandler(router::accept).listen(httpPort, ar -> {
if (ar.succeeded()) {
System.out.println("Http server started started on port " + httpPort);
} else {
ar.cause().printStackTrace();
}
});
// 2. 启动 Kafka 消费者消费第二个topic中的数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "master:9092");
properties.setProperty("group.id", "vertxdashboard");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(uberTopic));
System.out.println("consume from Kafka Topic " + uberTopic + " publish to eventbus dashboard");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
// 将消费的消息通过event bus push到客户端
vertx.eventBus().publish("dashboard", record.value());
System.out.println(record.value());
}
}
<!-- 引入vertx依赖 -->
<script src="js/vertx/sockjs.min.js"></script>
<script src="js/vertx/vertxbus.js"></script>
<!-- 引入 Google Maps -->
<script async defer src="https://maps.googleapis.com/maps/api/js?key=YOU-KEY&libraries=visualization&callback=initMap"
type="text/javascript"></script>
<!-- jquery -->
<script src="js/jquery/jquery.min.js"></script>
<script src="js/jquery/jquery-ui-widget.min.js"></script>
<script src="js/bootstrap/js/bootstrap.min.js"></script>
5
项目联调与报错解决
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic uber
~/bigdata/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic uberp
public static void main(String[] args) throws IOException, InterruptedException {
//判断有没有传参,main函数传参在run/Edit Conficonfiguration/Conficonfiguration/programme arguments里面
// 默认空格隔开多个参数
//判断有没有传参数,传的分别是topic:uber和文件定位:data/history/uber.csv
if (args.length != 2) {
throw new IllegalArgumentException("Must have the topic and file parameter : DataGenerator /apps/iot_stream:ecg /data/ecg.tsv ");
}
topic = args[0];
fileName = args[1];
System.out.println("Sending to topic " + topic);
//该方法实现配置kafka broker信息 key和value的序列化器,很简单,不展开
configureProducer();
//本地读取文件
File f = new File(fileName);
FileReader fr = new FileReader(f);
BufferedReader reader = new BufferedReader(fr);
String line = reader.readLine();
while (line != null) {
//一行一行读取 并调方法发送给topic uber
generateKVandPostMessage(line);
line = reader.readLine();
}
//记得关闭producer
producer.close();
System.out.println("All done.");
System.exit(1);
}
实现往uber topic发数据的方法代码:
private static void generateKVandPostMessage(String line) throws InterruptedException {
System.out.println(line);
//生成并发送记录到topic ,partition key producer tickInterval都是在主类中定义的static对象
ProducerRecord<String, String> rec = new ProducerRecord<>(topic, partition, key, line);
producer.send(rec);
//这里调节数据发送的速度
Thread.sleep(tickInterval);
}
然后直接在googl浏览器打开index.htm
3 数据可视化效果(gif)
4 相关报错
1> 报错java客户端用户无法写文件到hdfs目录中
解决:修改hdfs-site.xml中设置dfs.permissions.enabled为false
2> RealtimeUberPredictor编译报错缺失spark-kafka-connector依赖
解决:先把spark-kafka-connector编译通过,install一下,再重新编译RealtimeUberPredictor 通过
3> RealtimeUberPredictor运行报错:Scala compiler JARsnot found
解决:没有任何的Dstream操作,必须要添加action操作结束
4> kafkatool能连接kafka集群,但是一个broker都没有
5> google地图无法展示热力图,报错
解决:js 集成google 地图 API 的key失效,需要翻墙重新在google地图上申请。
----------------完。 欢迎交流-------------------------
以上是关于机器学习案例2-基于Spark ml KMeans实现uber载客位置聚类分析的主要内容,如果未能解决你的问题,请参考以下文章
4.Spark ML学习笔记—Spark ML决策树 (应用案例)随机森林GBDT算法ML 树模型参数详解 (本篇概念多)