大数据Spark物联网设备数据分析
Posted 赵广陆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据Spark物联网设备数据分析相关的知识,希望对你有一定的参考价值。
目录
1 设备监控数据
在物联网时代,大量的感知器每天都在收集并产生着涉及各个领域的数据。物联网提供源源不断的数据流,使实时数据分析成为分析数据的理想工具。
模拟一个智能物联网系统的数据统计分析,产生设备数据发送到Kafka,结构化流Structured
Streaming实时消费统计。对物联网设备状态信号数据,实时统计分析:
- 1)、信号强度大于30的设备;
- 2)、各种设备类型的数量;
- 3)、各种设备类型的平均信号强度;
编写程序模拟生成物联网设备监控数据,发送到Kafka Topic中,此处为了演示字段较少,实际
生产项目中字段很多。
1.1 创建 Topic
启动Kafka Broker服务,创建Topic【search-log-topic】,命令如下所示:
# 启动Zookeeper
/export/server/zookeeper/bin/zkServer.sh start
# 启动Kafka Broker
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
rm -rf /export/server/kafka/logs/*
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-fa
ctor 1 --partitions 3 --topic iotTopic
# 模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic iotTopic
# 模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic iotTopic
--from-beginning
# 删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic iotTopic
1.2 模拟数据
模拟设备监控日志数据,字段信息封装到CaseClass样例类【DeviceData】类,代码如下
package cn.itcast.spark.iot
/**
* 物联网设备发送状态数据
*
* @param device 设备标识符ID
* @param deviceType 设备类型,如服务器mysql, redis, kafka或路由器route
* @param signal 设备信号
* @param time 发送数据时间
*/
case class DeviceData(
device: String, //
deviceType: String, //
signal: Double, //
time: Long //
)
模拟产生日志数据类【MockIotDatas】具体代码如下:
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random
object MockIotDatas {
def main(args: Array[String]): Unit = {
// 发送Kafka Topic
val props = new Properties()
props.put("bootstrap.servers", "node1.itcast.cn:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val deviceTypes = Array(
"db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata"
)
val random: Random = new Random()
while (true) {
val index: Int = random.nextInt(deviceTypes.length)
val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}"
val deviceType: String = deviceTypes(index)
val deviceSignal: Int = 10 + random.nextInt(90)
// 模拟构造设备数据
val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis())
// 转换为JSON字符串
val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData)
println(deviceJson)
Thread.sleep(100 + random.nextInt(500))
val record = new ProducerRecord[String, String]("iotTopic", deviceJson)
producer.send(record)
}
// 关闭连接
producer.close()
}
}
相当于大机房中各个服务器定时发送相关监控数据至Kafka中,服务器部署服务有数据库db、大
数据集群bigdata、消息队列kafka及路由器route等等,数据样本:
{"device":"device_50","deviceType":"bigdata","signal":91.0,"time":1590660338429}
{"device":"device_20","deviceType":"bigdata","signal":17.0,"time":1590660338790}
{"device":"device_32","deviceType":"kafka","signal":93.0,"time":1590660338908}
{"device":"device_82","deviceType":"bigdata","signal":72.0,"time":1590660339380}
{"device":"device_32","deviceType":"kafka","signal":10.0,"time":1590660339972}
{"device":"device_96","deviceType":"bigdata","signal":18.0,"time":1590660343554}
2 基于DataFrame分析
按照业务需求,从Kafka消费日志数据,基于DataFrame数据结构调用函数分析,代码如下:
package cn.itcast.spark.iot
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 对物联网设备状态信号数据,实时统计分析:
* 1)、信号强度大于30的设备
* 2)、各种设备类型的数量
* 3)、各种设备类型的平均信号强度
*/
object IotStreamingOnline {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession会话实例对象,设置属性信息
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.config("spark.sql.shuffle.partitions", "3")
.getOrCreate()
// 导入隐式转换和函数库
import org.apache.spark.sql.functions._
import spark.implicits._
// 2. 从Kafka读取数据,底层采用New Consumer API
val iotStreamDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1.itcast.cn:9092")
.option("subscribe", "iotTopic")
// 设置每批次消费数据最大值
.option("maxOffsetsPerTrigger", "100000")
.load()
// 3. 对获取数据进行解析,封装到DeviceData中
val etlStreamDF: DataFrame = iotStreamDF
// 获取value字段的值,转换为String类型
.selectExpr("CAST(value AS STRING)")
// 将数据转换Dataset
.as[String] // 内部字段名为value
// 过滤数据
.filter(line => null != line && line.trim.length > 0)
// 解析JSON数据:{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}
.select(
get_json_object($"value", "$.device").as("device_id"),
get_json_object($"value", "$.deviceType").as("device_type"),
get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
get_json_object($"value", "$.time").cast(LongType).as("time")
)
// 4. 依据业务,分析处理
// TODO: signal > 30 所有数据,按照设备类型 分组,统计数量、平均信号强度
val resultStreamDF: DataFrame = etlStreamDF
// 信号强度大于10
.filter($"signal" > 30)
// 按照设备类型 分组
.groupBy($"device_type")
// 统计数量、评价信号强度
.agg(
count($"device_type").as("count_device"),
round(avg($"signal"), 2).as("avg_signal")
)
// 5. 启动流式应用,结果输出控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.start()
query.awaitTermination()
query.stop()
}
}
其中使用函数get_json_object提取JSON字符串中字段值,将最终结果打印控制台。
3 基于SQL分析
按照业务需求,从Kafka消费日志数据,提取字段信息,将DataFrame注册为临时视图,编写
SQL执行分析,代码如下:
package cn.itcast.spark.iot
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 对物联网设备状态信号数据,实时统计分析,基于SQL编程
* 1)、信号强度大于30的设备
* 2)、各种设备类型的数量
* 3)、各种设备类型的平均信号强度
*/
object IotStreamingOnlineSQL {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession会话实例对象,设置属性信息
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[3]")
.config("spark.sql.shuffle.partitions", "3")
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 2. 从Kafka读取数据,底层采用New Consumer API
val iotStreamDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1.itcast.cn:9092")
.option("subscribe", "iotTopic")
// 设置每批次消费数据最大值
.option("maxOffsetsPerTrigger", "100000")
.load()
// 3. 对获取数据进行解析,封装到DeviceData中
val etlStreamDF: DataFrame = iotStreamDF
// 获取value字段的值,转换为String类型
.selectExpr("CAST(value AS STRING)")
// 将数据转换Dataset
.as[String] // 内部字段名为value
// 过滤数据
.filter(line => null != line && line.trim.length > 0)
// 解析JSON数据:{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}
.select(
get_json_object($"value", "$.device").as("device_id"),
get_json_object($"value", "$.deviceType").as("device_type"),
get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
get_json_object($"value", "$.time").cast(LongType).as("time")
)
// 4. 依据业务,分析处理
// TODO: signal > 30 所有数据,按照设备类型 分组,统计数量、平均信号强度
// 4.1 注册DataFrame为临时视图
etlStreamDF.createOrReplaceTempView("view_tmp_stream_iots")
// 4.2 编写SQL执行查询
val resultStreamDF: DataFrame = spark.sql(
"""
|SELECT
| device_type, COUNT(device_type) AS count_device, ROUND(AVG(signal), 2) AS avg_signal
|FROM view_tmp_stream_iots
|WHERE signal > 30 GROUP BY device_type
|""".stripMargin)
// 5. 启动流式应用,结果输出控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
println("===========================================")
println(s"BatchId = ${batchId}")
println("===========================================")
if (!batchDF.isEmpty) batchDF.coalesce(1).show(20, truncate = false)
}
.start()
query.awaitTermination()
query.stop()
}
}
运行流式应用,结果如下图所示:
4 时间概念
在SparkStreaming中窗口统计分析:Window Operation(设置窗口大小WindowInterval和滑动大小SlideInterval),按照Streaming 流式应用接收数据的时间进行窗口设计的,其实是不符合实际应用场景的。
例如,在物联网数据平台中,每个设备产生的数据,其中包含数据产生的时间,然而数据需要
经过一系列采集传输才能被流式计算框架处理:SparkStreaming,此过程需要时间的,再按照处理
时间来统计业务的时候,准确性降低,存在不合理性。
在结构化流Structured Streaming中窗口数据统计时间是基于数据本身事件时间EventTime字
段统计,更加合理性,官方文档:
http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#window-operations-on-event-time
在Streaming流式数据处理中,按照时间处理数据,其中时间有三种概念:
- 1)、事件时间EventTime,表示数据本身产生的时间,该字段在数据本身中;
- 2)、注入时间IngestionTime,表示数据到达流式系统时间,简而言之就是流式处理系统接收到
数据的时间; - 3)、处理时间ProcessingTime,表示数据被流式系统真正开始计算操作的时间。不同流式计算框架支持时间不一样,SparkStreaming框架仅仅支持处理时间ProcessTime,StructuredStreaming支持事件时间和处理时间,Flink框架支持三种时间数据操作,实际项目中往往针对【事件时间EventTime】进行数据处理操作,更加合理化。
5 event-time 窗口分析
基于事件时间窗口聚合操作:基于窗口的聚合(例如每分钟事件数)只是事件时间列上特殊类型的分组和聚合,其中每个时间窗口都是一个组,并且每一行可以属于多个窗口/组。事件时间EventTime是嵌入到数据本身中的时间,数据实际真实产生的时间。例如,如果希望获得每分钟由物联网设备生成的事件数,那么可能希望使用生成数据的时间(即数据中的事件时间event time),而不是Spark接收数据的时间(receive time/archive time)。
这个事件时间很自然地用这个模型表示,设备中的每个事件(Event)都是表中的一行(Row),而事件时间(Event Time)是行中的一列值(Column Value)。
因此,这种基于事件时间窗口的聚合查询既可以在静态数据集(例如,从收集的设备事件日志中)上定义,也可以在数据流上定义,从而使用户的使用更加容易。修改词频统计程序,数据流包含每行数据以及生成每行行的时间。希望在10分钟的窗口内对单词进行计数,每5分钟更新一次,如下图所示:
单词在10分钟窗口【12:00-12:10、12:05-12:15、12:10-12:20】等之间接收的单词中计数。注意,
【12:00-12:10】表示处理数据的事件时间为12:00之后但12:10之前的数据。思考一下,12:07的一条数据,应该增加对应于两个窗口12:00-12:10和12:05-12:15的计数。基于事件时间窗口统计有两个参数索引:分组键(如单词)和窗口(事件时间字段)。
为了演示案例,将上述案例中的每5分钟统计最近10分钟窗口改为每5秒统计最近10秒窗口数
据,测试数据集:
2019-10-12 09:00:02,cat dog
2019-10-12 09:00:03,dog dog
2019-10-12 09:00:07,owl cat
2019-10-12 09:00:11,dog
2019-10-12 09:00:13,owl
案例中三个时间范围,说明如下:
1、触发时间间隔,trigger interval:5秒 (案例:5分钟)
2、事件时间窗口大小,window interval:10秒(案例:10分钟)
3、滑动大小,slider interval:5秒(案例:5分钟)
官方案例演示代码如下:
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 基于Structured Streaming 模块读取TCP Socket读取数据,进行事件时间窗口统计词频WordCount,将结果打印到控制台
* TODO:每5秒钟统计最近10秒内的数据(词频:WordCount)
*
* EventTime即事件真正生成的时间:
* 例如一个用户在10:06点击 了一个按钮,记录在系统中为10:06
* 这条数据发送到Kafka,又到了Spark Streaming中处理,已经是10:08,这个处理的时间就是process Time。
*
* 测试数据:
* 2019-10-12 09:00:02,cat dog
* 2019-10-12 09:00:03,dog dog
* 2019-10-12 09:00:07,owl cat
* 2019-10-12 09:00:11,dog
* 2019-10-12 09:00:13,owl
*/
object StructuredWindow {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象,传递sparkConf参数
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
// 2. 使用SparkSession从TCP Socket读取流式数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn")
.option("port", 9999)
.load()
// 3. 针对获取流式DStream进行词频统计
val resultStreamDF = inputStreamDF
// 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
.as[String]
.filter(line => null != line && line.trim.length > 0)
// 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
.flatMap { line =>
val arr = line.trim.split(",")
arr(1).split("\\\\s+").map(word => (Timestamp.valueOf(arr(0)), word))
}
// 设置列的名称
.toDF("insert_timestamp", "word")
// TODO:设置基于事件时间(event time)窗口 -> insert_timestamp, 每5秒统计最近10秒内数据
/*
1. 先按照窗口分组、2. 再对窗口中按照单词分组、 3. 最后使用聚合函数聚合
*/
.groupBy(
window($"insert_timestamp", "10 seconds", "5 seconds"), $"word"
).count()
.orderBy($"window") // 按照窗口字段降序排序
/*
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- word: string (nullable = true)
|-- count: long (nullable = false)
*/
//resultStreamDF.printSchema()
// 4. 将计算的结果输出,打印到控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start(以上是关于大数据Spark物联网设备数据分析的主要内容,如果未能解决你的问题,请参考以下文章