目录 sparkstreaming+flume+kafka实时流式处理完整流程 一、前期准备 二、实现步骤 1.引入依赖 2.日志收集服务器 3.日志接收服务器 4、spark集群处理接收数据并写入数据库 5、测试结果
sparkstreaming+flume+kafka实时流式处理完整流程 一、前期准备
1、环境准备,四台测试服务器
spark集群三台,hadoop02,hadoop03,hadoop04
kafka集群三台,hadoop02,hadoop03,hadoop04
zookeeper集群三台,hadoop02,hadoop03,hadoop04
日志接收服务器, hadoop02
日志收集流程:
日志收集服务器->日志接收服务器->kafka集群->spark集群处理
说明: 日志收集服务器,在实际生产中很有可能是应用系统服务器,日志接收服务器为大数据服务器中一台,日志通过网络传输到日志接收服务器,再入集群处理。
因为,生产环境中,往往网络只是单向开放给某台服务器的某个端口访问的。
Flume版本:apache-flume-1.9.0-bin.tar.gz,该版本已经较好地集成了对kafka的支持
2.1 实时计算 跟实时系统类似(能在严格的时间限制内响应请求的系统),例如在股票交易中,市场数据瞬息万变,决策通常需要秒级甚至毫秒级。通俗来说,就是一个任务需要在非常短的单位时间内计算出来,这个计算通常是多次的。
2.2 流式计算 通常指源源不断的数据流过系统,系统能够不停地连续计算。这里对时间上可能没什么特别限制,数据流入系统到产生结果,可能经过很长时间。比如系统中的日志数据、电商中的每日用户访问浏览数据等。
2.3 实时流式计算 将实时计算和流式数据结合起来,就是实时流式计算,也就是大数据中通常说的实时流处理。数据源源不断的产生的同时,计算时间上也有了严格的限制。比如,目前电商中的商品推荐,往往在你点了某个商品之后,推荐的商品都是变化的,也就是实时的计算出来推荐给你的。再比如你的手机号,在你话费或者流量快用完时,实时的给你推荐流量包套餐等。
二、实现步骤
1.引入依赖
!注意:需要与自己安装对应版本一致
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ScalaProject</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.4.8</spark.version>
<spark.artifact.version>2.11</spark.artifact.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_$spark.artifact.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>$hadoop.version</version>
</dependency>
<!-- 使用scala2.11.8进行编译和打包 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>$scala.version</version>
</dependency>
<!-- 引入mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>$spark.version</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
<build>
<!-- 指定scala源代码所在的目录 -->
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/main/scala</testSourceDirectory>
<plugins>
<!--对src/main/java下的后缀名为.java的文件进行编译 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<!-- scala的打包插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.5.4</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>$scala.version</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.日志收集服务器
配置flume动态收集特定的日志,collect.conf 配置如下:
# Name the components on this agent
a1.sources = tailsource-1
a1.sinks = remotesink
a1.channels = memoryChnanel-1
# Describe/configure the source
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.command = tail -F /training/data/logs/my1.log
a1.sources.tailsource-1.channels = memoryChnanel-1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.memoryChnanel-1.type = memory
a1.channels.memoryChnanel-1.keep-alive = 10
a1.channels.memoryChnanel-1.capacity = 100000
a1.channels.memoryChnanel-1.transactionCapacity = 100000
# Bind the source and sink to the channel
a1.sinks.remotesink.type = avro
a1.sinks.remotesink.hostname = hadoo02
a1.sinks.remotesink.port = 6666
a1.sinks.remotesink.channel = memoryChnanel-1
注意:需创建/training/data/logs/my1.log文件
启动日志收集端脚本:
bin/flume-ng agent --conf conf --conf-file conf/collect.conf --name a1 -Dflume.root.logger=INFO,console
3.日志接收服务器
配置flume实时接收日志,kafka-flume.conf 配置如下:
| |
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
producer.sources.s.type = avro
producer.sources.s.bind = hadoop02
producer.sources.s.port = 6666
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.topic = flumetopic
producer.sinks.r.brokerList = hadoop02:9092,hadoop03:9092,hadoop04:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 20
producer.sinks.r.channel = c1
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = org.apache.flume.channel.kafka.KafkaChannel
producer.channels.c.capacity = 10000
producer.channels.c.transactionCapacity = 1000
producer.channels.c.brokerList=hadoop02:9092,hadoop03:9092,hadoop04:9092
producer.channels.c.topic=channel1
producer.channels.c.zookeeperConnect=hadoop02:2181,hadoop03:2181,hadoop04:2181 启动接收端脚本:
bin/flume-ng agent --conf conf --conf-file conf/kafka-flume.conf --name producer -Dflume.root.logger=INFO,console
| |
4、spark集群处理接收数据并写入数据库 package sparkclass.sparkstreaming
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import java.sql.Connection, DriverManager, PreparedStatement
/**
* @author Administrator
*/
object KafkaDataTest1
def main(args: Array[String]): Unit =
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
val conf = new SparkConf().setAppName("stocker").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(3))
val sc1 = ssc.sparkContext
ssc.checkpoint("hdfs://hadoop02:9000/spark/checkpoint")
// Kafka configurations
val topics = Set("flumetopic")
val brokers = "hadoop02:9092,hadoop03:9092,hadoop04:9092"
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> "hadoop02:9092", //从那些broker消费数据
"key.deserializer" -> classOf[StringDeserializer], //发序列化的参数,因为写入kafka的数据经过序列化
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test-consumer-group", //指定group.id
"auto.offset.reset" -> "latest",//指定消费的offset从哪里开始:① earliest:从头开始 ;② latest从消费者启动之后开始
"enable.auto.commit" -> (false: java.lang.Boolean)
//是否自动提交偏移量 offset 。默认值就是true【5秒钟更新一次】,
// true 消费者定期会更新偏移量 groupid,topic,parition -> offset ;
// "false" 不让kafka自动维护偏移量 手动维护偏移
)
val kafkaStream = KafkaUtils.createDirectStream[String, String]
(ssc,PreferConsistent,Subscribe[String,String](topics, kafkaParams))
val mapDStream: DStream[(String, String)] = kafkaStream.map(record =>
(record.key,record.value))
val urlClickLogPairsDStream: DStream[(String, Int)] =
mapDStream.flatMap(_._2.split(" ")).map((_, 1))
val urlClickCountDaysDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) =>
v1 + v2
,
Seconds(30),
Seconds(3)
);
//写入数据库
//定义函数用于累计每个单词出现的次数
val addWordFunction = (currentValues:Seq[Int],previousValueState:Option[Int])=>
//通过spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和
val currentCount = currentValues.sum
//已经进行累加的值
val previousCount = previousValueState.getOrElse(0)
//返回累加后的结果,是一个Option[Int]类型
Some(currentCount+previousCount)
val result = urlClickLogPairsDStream.updateStateByKey(addWordFunction)
//将DStream中的数据存储到mysql数据库中
result.foreachRDD(
rdd=>
val url = "jdbc:mysql://localhost:3306/hadoop?useUnicode=true&characterEncoding=UTF-8"
val user = "root"
val password = "LIUJIANG1313"
Class.forName("com.mysql.jdbc.Driver").newInstance()
//截断数据表,将数据表原有的数据进行删除
var conn1: Connection = DriverManager.getConnection(url,user,password)
//此句是清空数据表
val sql1 = "truncate table word"
var stmt1 : PreparedStatement = conn1.prepareStatement(sql1)
stmt1.executeUpdate()
conn1.close()
rdd.foreach(
data=>
//将数据库数据更新为最新的RDD中的数据集
var conn2: Connection = DriverManager.getConnection(url,user,password)
val sql2 = "insert into word(wordName,count) values(?,?)"
var stmt2 : PreparedStatement = conn2.prepareStatement(sql2)
stmt2.setString(1,data._1.toString)
stmt2.setString(2,data._2.toString)
stmt2.executeUpdate()
conn2.close()
)
)
urlClickCountDaysDStream.print();
ssc.start()
ssc.awaitTermination()
5、测试结果
往日志中依次追加三次日志
idea运行结果
查看数据库
| |