python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示
Posted 靠谱杨的挨踢IT生活
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示相关的知识,希望对你有一定的参考价值。
使用爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集、分析、展示
主要工作流程如下所示:
其中爬虫获取实时数据,并把数据实时传输到Linux本地文件夹中。
使用Flume实时监控该文件夹,如果发现文件内容变动则进行处理,将数据抓取并传递到Kafka消息队列中。
之后使用Spark Streaming 实时处理Kafka通道中的数据,并写入本地mysql数据库中,之后读取mysql数据库中的数据并基于Echart图表对数据进行实时动态展示。
一、实时数据的模拟
案例简化了第一步的流程,使用模拟数据进行测试,代码如下:
import datetime
import random
import time
import paramiko
hostname = "hadoop102"
port = 22
username = "root"
password = "000429"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, port, username, password, compress=True)
sftp_client = client.open_sftp()
# try:
# for line in remote_file:
# print(line)
# finally:
# remote_file.close()
#获取系统时间
num1=3000
for i in range(1000):
remote_file = sftp_client.open("/opt/module/data/test1.csv", a) # 文件路径
time1 = datetime.datetime.now()
time1_str = datetime.datetime.strftime(time1, %Y-%m-%d %H:%M:%S)
print("当前时间: " + time1_str)
time.sleep(random.randint(1,3))
num1_str=str(num1+random.randint(-1300,1700))
print("当前随机数: "+num1_str)
remote_file.write(time1_str+","+num1_str+"\\n")
remote_file.close()
- 主要过程
- 在/opt/module/data/路径下建立test1.csv文件
- 代码实现远程连接虚拟机hadoop102并以root用户身份登录,打开需要上传的文件目录。
- 使用一个for循环间隔随机1到3秒向文件中写入一些数据。
二、Flume实时监控文件
- 进入/opt/module/flume/job路径编辑配置文件信息(myflume.conf)
内容如下:其中指定了被监控文件的路径,Kafka服务主机地址,Kafka主题和序列化等信息
#给agent中的三个组件source、sink和channel各起一个别名,a1代表为agent起的别名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source属性配置信息
a1.sources.r1.type = exec
#a1.sources.r1.bind = localhost
#a1.sources.r1.port = 44444
a1.sources.r1.command=tail -F /opt/module/data/test1.csv
# sink属性配置信息
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers:hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic=first
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoer
#channel属性配置信息
#内存模式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#传输参数设置
a1.channels.c1.transactionCapacity=100
#绑定source和sink到channel上
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
- 在/opt/module/flume 路径下开启Flume,此时Flume开始监控目标文件(job/myflume.conf)
bin/flume-ng agent -c conf/ -n a1 -f job/myflume.conf -Dflume.root.logger=INFO,console
三、开启Kafka并使用Spark Streaming完成数据的接收
- 首先需要开启集群的zookeeper服务
- 之后开启Kafka服务
- 开启Kafka后,新建一个名为first的主题(topic)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
- 新建Maven项目,编写代码,Kafka的topic主题的消费者
pom.xml配置如下:注意此处各个资源的版本号一定要与本机(IDEA编译器)的Scala版本一致,博主为Scala 2.12.11
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.reliable.ycw</groupId>
<artifactId>spark-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-network-common -->
<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-network-common_2.12</artifactId>-->
<!--<version>3.0.0</version>-->
<!--</dependency>-->
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.11</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.11</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.11</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将 Scala 代码编译成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 声明绑定到 maven 的 compile 阶段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
消费者类代码如下:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import java.sql.DriverManager
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.streaming.Seconds, StreamingContext
/** Utility functions for Spark Streaming examples.*/
object StreamingExamples extends App
/** Set reasonable logging levels for streaming if the user has not configured log4j.*/
// def setStreamingLogLevels()
// val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
// if (!log4jInitialized)
// // We first log Appsomething to initialize Sparks default logging, then we override the
// // logging level.
// logInfo("Setting log level to [WARN] for streaming example." +
// " To override add a custom log4j.properties to the classpath.")
// Logger.getRootLogger.setLevel(Level.WARN)
//
//
val conf=new SparkConf().setMaster("local").setAppName("jm")
.set("spark.streaming.kafka.MaxRatePerPartition","3")
.set("spark.local.dir","./tmp")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//创建上下文,2s为批处理间隔
val ssc = new StreamingContext(conf,Seconds(5))
//配置kafka参数,根据broker和topic创建连接Kafka 直接连接 direct kafka
val KafkaParams = Map[String,Object](
//brokers地址
"bootstrap.servers"->"hadoop102:9092,hadoop103:9092,hadoop104:9092",
//序列化类型
"key.deserializer"->classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "MyGroupId",
//设置手动提交消费者offset
"enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
)
//获取KafkaDStream
val kafkaDirectStream = KafkaUtils.createDirectStream[String,String](ssc,
//
PreferConsistent,Subscribe[String,String](List("first"),KafkaParams))
kafkaDirectStream.print()
var num=kafkaDirectStream.count()
var num_1=""
num foreachRDD (x =>
// var res=x.map(line=>line.split(","))
val connection = getCon()
var time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date).toString
var sql = "insert into content_num values(" + time + "," + x.collect()(0) + ")"
connection.createStatement().execute(sql)
connection.close()
)
// print("sdfasdf")
// print(num_1)
//根据得到的kafak信息,切分得到用户电话DStream
// val nameAddrStream = kafkaDirectStream.map(_.value()).filter(record=>
// val tokens: Array[String] = record.split(",")
// tokens(1).toInt==0
// )
//
// nameAddrStream.print()
// .map(record=>
// val tokens = record.split("\\t")
// (tokens(0),tokens(1))
// )
//
//
// val namePhoneStream = kafkaDirectStream.map(_.value()).filter(
// record=>
// val tokens = record.split("\\t")
// tokens(2).toInt == 1
//
// ).map(record=>
// val tokens = record.split("\\t")
// (tokens(0),tokens(1))
// )
//
// //以用户名为key,将地址电话配对在一起,并产生固定格式的地址电话信息
// val nameAddrPhoneStream = nameAddrStream.join(namePhoneStream).map(
// record=>
// s"姓名:$record._1,地址:$record._2._1,邮编:$record._2._2"
//
// )
// //打印输出
// nameAddrPhoneStream.print()
//开始计算
ssc.start()
ssc.awaitTermination()
def getCon()=
Class.forName("com.mysql.cj.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8","root","000429")
这段代码指定了虚拟机中Kafka的主题信息,并从中定时获取(博主设置的为5秒)期间变化的信息量,完成计算后把本机的时间和信息变化量存储到本地Mysql数据库中
- 注意指定时区和编码
jdbc:mysql://localhost:3306/spark?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8
四、可视化
使用Echarts平滑折线图完成数据的展示(python flask框架)
- 后台读取mysql的数据
import pymysql
def get_conn():
"""
获取连接和游标
:return:
"""
conn=pymysql.connect(host="127.0.0.1",
user="root",
password="000429",
db="spark",
charset="utf8")
cursor=conn.cursor()
return conn,cursor
def close_conn(conn, cursor):
"""
关闭连接和游标
:param conn:
:param cursor:
:return:
"""
if cursor:
cursor.close()
if conn:
conn.close()
#query
def query(sql,*args):
"""
通用封装查询
:param sql:
:param args:
:return:返回查询结果 ((),())
"""
conn , cursor= get_conn()
print(sql)
cursor.execute(sql)
res = cursor.fetchall()
close_conn(conn , cursor)
return res
def dynamic_bar():
# 获取数据库连接
conn, cursor = get_conn()
if (conn != None):
print("数据库连接成功!")
typenumsql = "select * from content_num order by time desc limit 11;"
detail_sql = ""
res_title = query(typenumsql)
type_num = [] # 存储类别+数量
for item1 in res_title:
type_num.append(item1)
return type_num
- 路由获取后台数据
#获取 动态 柱状图数据
@app.route(/dynamic_bar)
def dynamic_bar():
res_list=spark_sql.dynamic_bar()
my_list=[]
list_0=[]
list_1=[]
for item in res_list:
list_0.append(item[0])
list_1.append(item[1])
my_list.append(list_0)
my_list.append(list_1)
return "data":my_list
- 前台绘制折线图
<!DOCTYPE html>
<html style="height: 100%">
<head>
<meta charset="utf-8">
</head>
<body style="height: 100%; margin: 0">
<div id="container" style="height: 100%"></div>
<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/echarts@5.2.2/dist/echarts.min.js"></script>
<script src="../static/js/jquery-3.3.1.min.js"></script>
</body>
</html>
<script>
var dom = document.getElementById("container");
var myChart = echarts.init(dom);
var app = ;
var option;
</script>
<script type="text/javascript">
option =
tooltip:
trigger: axis,
axisPointer:
type: shadow
,
grid:
left: 3%,
right: 4%,
bottom: 3%,
containLabel: true
,
xAxis: [
type: category,
data: [],
axisTick:
alignWithLabel: true
],
yAxis: [
type: value
],
series: [
name: Direct,
type: bar,
barWidth: 60%,
data: []
]
;
if (option && typeof option === object)
myChart.setOption(option);
function update()
$.ajax(
url:"/dynamic_bar",
async:true,
success:function (data)
option.xAxis[0].data=data.data[0]
option.series[0].data=data.data[1]
myChart.setOption(option);
,
error:function (xhr,type,errorThrown)
alert("出现错误!")
)
setInterval("update()",100)
</script>
可视化这里需要注意的点:
- 注意先引入echarts.min.js再引入jquery-3.3.1.min.js
- 注意指定放置图像的div块的大小
- 把赋值方法放在图像初始化配置代码的后面
- 注意设置方法循环执行:setInterval("update()",100)
小结:整个流程的关键在于对实时数据的监控和展示,首先要保证数据传输的动态性,其次要保证Flume实时监控数据的变化。其中使用Kafka的目的在于当数据量足够大的时候,往往会出现数据的监控和采集速度跟不上数据的变化,所以采用Kafka消息队列机制,让其缓冲数据以实现大数据量的处理,后续需要编写Spark Streaming代码完成对消息的收集处理(存入本地mysql数据库),最后读取数据库数据并用折线图完成动态展示效果,数据库的数据是实时变动的,这就需要在读取的时候要读到最新进来的数据,这样才能看到图线的动态效果。(下图的图线会随着数据的变化动态改变!)
作者:靠谱杨
我可能不是天才,但我会努力成为人才。
更多日常分享尽在我的VX公众号:小杨的挨踢IT生活
以上是关于python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示的主要内容,如果未能解决你的问题,请参考以下文章