spark作业--实时分析springboot日志
Posted 一加六
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark作业--实时分析springboot日志相关的知识,希望对你有一定的参考价值。
在云服务器上做的,由于白嫖的云服务器性能比较差,就设计了如下架构。
功能与设计
(大数据集群+架构设计+功能分析与设计)
总体架构图
功能:
订单成交量统计分析
历史成交总金额
热门分类的实时和离线统计分析
热门商品的实时和离线统计分析
活跃用户统计分析
项目实现
SpringBoot tmall商城部署
在服务器git拉取tmall springboot项目到本地,配置mysql,创建对应数据库,运行sql文件,复制数据库,运行springboot项目,生成日志文件到/root/log/info/下
flume采集
flume采集数据有两个流向,一个存入hdfs,另一个为kafkachannel。
数据存入hdfs的用作离线分析,kafkachannel则将数据给到sparkstreaming实时处理
数据流向
flume采集方案配置文件如下:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a3.sources = r3
a3.sinks = sinkhdfs
a3.channels = ch1 kafka-channel
# Define an Avro source called avro-source1 on a3 and tell it
a3.sources.r3.channels = ch1 kafka-channel
#a3.sources.r3.type = spooldir
#a3.sources.r3.spoolDir = /root/logs/info
#a3.sources.r3.ignorePattern = ^(.)*\\\\.tmp$
a3.sources.r3.type = exec
a3.sources.r3.command = tail -F /root/logs/info/info.log
# Define a memory channel called ch1 on a3
a3.channels.ch1.type = memory
a3.channels.ch1.capacity = 100000
a3.channels.ch1.transactionCapacity = 1000
a3.channels.ch1.keep-alive = 10
a3.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
a3.channels.kafka-channel.kafka.bootstrap.servers = master:9092,slave2:9092
a3.channels.kafka-channel.kafka.topic = tmalllog
a3.channels.kafka-channel.kafka.producer.acks = 1
a3.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a3.channels.kafka-channel.parseAsFlumeEvent = false
kafka-streaming实时处理
需搭建zookeeper、kafka集群,消费来自kafka生产者的消息
编写sparkstreaming应用程序
(1)添加kafka的pom依赖 主要
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- 导入spark的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>
</project>
实时处理代码
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.regexp_extract
import org.apache.spark.streaming.kafka010.ConsumerStrategies, KafkaUtils, LocationStrategies
import org.apache.spark.streaming.Seconds, StreamingContext
import org.apache.spark.SparkConf, SparkContext
import java.util.Date, Properties
object Kafka_spark_streaming
def main(args: Array[String]): Unit =
// offset保存路径
val checkpointPath = "file:///export/data/kafka/checkpoint/kafka-direct"
val conf = new SparkConf()
.setAppName("ScalaKafkaStreaming")
.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(checkpointPath)
val spark: SparkSession = new SparkSession.Builder().master("local").appName("sqlDemo").getOrCreate()
val bootstrapServers = "master:9092,slave1:9092,slave2:9092"
val groupId = "flume"
val topicName = "tmalllog"
val maxPoll = 500
val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
case class schema(mytime: String, action: String, frequency: Int)
val kafkaTopicDS = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set(topicName), kafkaParams))
import spark.implicits._
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "password")
properties.setProperty("driver", "com.mysql.jdbc.Driver")
val uri = "jdbc:mysql://slave2:3306/tmalldata?useSSL=false"
kafkaTopicDS.foreachRDD(
foreachFunc = rdd => if (!rdd.isEmpty())
//数据业务逻辑处理
val now: Long = new Date().getTime
val now2: String = now.toString
val action_df = rdd.map(_.value)
.map(_.split("-"))
.filter(x => x.length == 3)
.map(x => x(2))
.map(x => (x, 1))
.reduceByKey(_ + _)
.map(x => (now2, x._1, x._2))
.toDF("mytime", "action", "frequency")
val top_category = action_df.select("*").where("action like '%分类ID为%'").orderBy(action_df("frequency").desc)
if (top_category.count()>0)
top_category.show()
top_category.write.mode("append").jdbc(uri, "category", properties)
val product_Popular_Buy = action_df.select("*").where("action like '%通过产品ID获取产品信息%'").orderBy(action_df("frequency").desc)
if (product_Popular_Buy.count()>0)product_Popular_Buy.show()
product_Popular_Buy.write.mode("append").jdbc(uri, "product", properties)
val Active_users = action_df.select("*").where("action like '%用户已登录,用户ID%'").orderBy(action_df("frequency"))
if(Active_users.count()>0)
Active_users.show()
Active_users.write.mode("append").jdbc(uri, "activeusers", properties)
val money = action_df.select("*").where("action like '%总共支付金额为%'").orderBy(action_df("frequency").desc)
val money2 = money.withColumn("single_transaction", regexp_extract($"action", "/d+", 0))
if(money2.count()>0)
money2.show()
money2.write.mode("append").jdbc(uri, "trading", properties)
)
ssc.start()
ssc.awaitTermination()
运行查看控制台输出结果
再idea打包jar,上传服务器,sparksubmit提交任务可将实时数据写入mysql数据库
当前登录用户
当前热门分类
当前热门商品
订单数量
spark离线数据分析
3.4 创建hive表分区
五、 经验总结
云服务器环境搭建问题(主要)
1、zookeeper、kafka、hadoop集群搭建时公网ip与内网ip的问题,导致无法识别。
通过对网上资料及博客的查阅发现是云服务器中只有一块内网网卡,外网地址不是直接配置在云服务器中,程序无法绑定公网IP地址,所以需要对/etc/hosts文件进行修改,如下。
解决方法:本服务器配内网ip,其他服务器配公网ip
2、hadoop高可用问题,hadoop高可用环境导致flume日志采集失败。
解决方法:添加hadoop配置文件到flume/lib下。
3、端口问题,因为服务器的端口未打开,导致部分进程无法启动。
解决方法:在服务器安全组以及宝塔开放端口。
搭建环境常用的默认端口
hadoop 9000 50070 50010
zookeeper 2181 2888 3888
kafka 9092
mysql 3306
spark 7077
以上是关于spark作业--实时分析springboot日志的主要内容,如果未能解决你的问题,请参考以下文章
Spark 实践——基于 Spark Streaming 的实时日志分析系统