全网详解从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统

Posted 王小王_123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了全网详解从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统相关的知识,希望对你有一定的参考价值。


目录

​万事具备之巧借东风​

​预备知识​

​环境搭建​

​Spark安装​

​Kafka安装​

​Kafka核心知识介绍​

​Kafka开启及测试服务​

​Python依赖库​

​PyCharm安装​

​搭建总结​

​八仙过海之各显神通​

​数据预处理​

​运行效果代码​

​代码展示​

​神笔马良之画龙点睛​

​Spark Streaming实时处理数据​

​配置Spark开发Kafka环境​

​建立pyspark项目​

​华佗在世之妙手回春​

​结果展示之移花接木​

​app.py(直接运行)​

​index.html​

​总结​

​每文一语​


万事具备之巧借东风

预备知识

Linux系统命令使用、了解如何安装Python库、安装kafka。

熟悉Linux基本操作Pycharm的安装Spark安装Kafka安装

环境搭建

Spark安装

至于如何安装好spark,我这里就不详细介绍了,请点击标题,即可跳转到文章详情页,里面有spark的安装资料和教程。

Kafka安装

点击此处下载,下载kafka_2.11-2.4.0.tgz。此安装包内已经附带zookeeper,不需要额外安装zookeeper.按顺序执行如下步骤:

首先将下载好的安装包放在我们虚拟机里面(Ubuntu)

使用命令进行解压

sudo tar -zxf /home/hadoop/kafka/kafka_2.11-2.4.0.tgz -C /home/hadoop/kafka

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_kafka

解压成功之后,需要我们对其进行改名,方便我们后续的操作

cd /home/hadoop/kafka
sudo mv kafka_2.11-2.4.0/  kafka

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_hadoop_02

Kafka核心知识介绍

下面介绍Kafka相关概念,以便运行下面实例的同时,更好地理解Kafka.
1. Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
2. Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
3. Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.
4. Producer
负责发布消息到Kafka broker
5. Consumer
消息消费者,向Kafka broker读取消息的客户端。
6. Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

Kafka开启及测试服务

接下来在Ubuntu系统环境下测试简单的实例。Mac系统请自己按照安装的位置,切换到相应的指令。按顺序执行如下命令:

进入kafka所在的目录

cd /home/hadoop/kafka/kafka

输入该命令

bin/zookeeper-server-start.sh config/zookeeper.properties

命令执行后不会返回Shell命令输入状态,zookeeper就会按照默认的配置文件启动服务,请千万不要关闭当前终端.启动新的终端,输入如下命令:

cd /home/hadoop/kafka/kafka
bin/kafka-server-start.sh config/server.properties

 kafka服务端就启动了,请千万不要关闭当前终端。启动另外一个终端,输入如下命令(测试):

cd /home/hadoop/kafka/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_实时展示交易可视化数据_03

注意上面的步骤顺序缺一不可:初学者,千万要记住,先启动zookeeper,再启动kafka,这个很重要,不然会出错,切记!!!

topic是发布消息发布的category,以单节点的配置创建了一个叫dblab的topic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。

bin/kafka-topics.sh --list --zookeeper localhost:2181

可以在结果中查看到dblab这个topic存在

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_大数据_04

接下来用producer生产点数据:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab

并尝试输入如下信息:

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_大数据_05

然后再次开启新的终端或者直接按CTRL+C退出。然后使用consumer来接收数据,输入如下命令:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning

便可以看到刚才产生的信息。说明kafka安装成功!!!

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_大数据_06


 

Python依赖库

本项目主要使用了两个Python库,Flask和Flask-SocketIO,这两个库的安装非常简单,请启动进入Ubuntu系统,打开一个命令行终端(可以使用快捷键Ctrl+Alt+T)。
Python之所以强大,其中一个原因是其丰富的第三方库。pip则是python第三方库的包管理工具。Python3对应的包管理工具是pip3。因此,需要首先在Ubuntu系统中安装pip3,命令如下

sudo apt-get install python3-pip

安装完pip3以后,可以使用如下Shell命令完成Flask和Flask-SocketIO这两个Python第三方库的安装以及与Kafka相关的Python库的安装:

pip3 install flask

pip3 install flask-socketio

pip3 install kafka-python

这些安装好的库在我们的程序文件的开头可以直接用来引用。比如下面的例子。

from flask import Flask
from flask_socketio import SocketIO
from kafka import KafkaConsumer

from import 跟直接import的区别举个例子来说明。
import socket的话,要用socket.AF_INET,因为AF_INET这个值在socket的名称空间下。
from socket import* 是把socket下的所有名字引入当前名称空间。

但是对于本次项目,我们使用的是pycharm开发工具,所以可以不用这样,我们直接使用anaconda里面的安装命令,这样更加的快捷。


 

PyCharm安装

pycharm的详解安装步骤,在之前就已经介绍的非常详细了,这里只需要点击标题即可

搭建总结

搭建成功我们就可以把我们的项目引入进来

首先利用pycharm,我们要安装第三方库

 pip --default-timeout=100 install kafka -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com

安装其他的第三方库,反正没有的都可以自己安装!

pip install flask_socketio

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_hadoop_07

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_kafka_08

这里先给出本项目Python工程的目录结构,后续的操作可以根据这个目录进行操作

Python工程目录结构

  1. data目录存放的是用户日志数据;
  2. scripts目录存放的是Kafka生产者和消费者;
  3. static/js目录存放的是前端所需要的js框架;
  4. templates目录存放的是html页面;
  5. app.py为web服务器,接收Spark Streaming处理后的结果,并推送实时数据给浏览器;

至此,本项目需要的开发环境及搭建就介绍完毕!


八仙过海之各显神通

数据预处理

数据集介绍

本项目的数据集压缩包为data​​数据集​​,有需要的可以在评论区留言QQ邮箱:456789321@qq.com

该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个项目中只是用user_log.csv这个文件,下面列出文件user_log.csv的数据格式定义:

用户行为日志user_log.csv,日志中的字段定义如下:
1. user_id | 买家id
2. item_id | 商品id
3. cat_id | 商品类别id
4. merchant_id | 卖家id
5. brand_id | 品牌id
6. month | 交易时间:月
7. day | 交易事件:日
8. action | 行为,取值范围0,1,2,3,0表示点击,1表示加入购物车,2表示购买,3表示关注商品
9. age_range | 买家年龄分段:1表示年龄<18,2表示年龄在[18,24],3表示年龄在[25,29],4表示年龄在[30,34],5表示年龄在[35,39],6表示年龄在[40,49],7和8表示年龄>=50,0和NULL则表示未知
10. gender | 性别:0表示女性,1表示男性,2和NULL表示未知
11. province| 收获地址省份

数据具体格式如下:

user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,844400,1271,2882,2661,08,29,0,1,1,山西

这个项目实时统计每秒中男女生购物人数,因此针对每条购物日志,我们只需要获取gender即可,然后发送给Kafka,接下来Spark Streaming再接收gender进行处理。

数据预处理

本项目使用Python对数据进行预处理,并将处理后的数据直接通过Kafka生产者发送给Kafka,这里需要先安装Python操作Kafka的代码库,请在Ubuntu中打开一个命令行终端,执行如下Shell命令来安装Python操作Kafka的代码库(备注:如果之前已经安装过,则这里不需要安装):

运行效果代码

注意:

在运行项目之前,首先要保证你的项目代码里面的第三方库是否已经全部安装完毕,如果没有,可以参考上面的步骤完成

其次在开启上述KafkaProducer和KafkaConsumer之前,需要先开启Kafka(分开执行,按照顺序,注意在开启kafka之前)

初学者,千万要记住,先启动zookeeper,再启动kafka,这个很重要,不然会出错,切记!!!

cd /home/hadoop/kafka/kafka

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

代码展示

producer.py

# coding: utf-8
from kafka import KafkaProducer
import csv
import time

producer = KafkaProducer(bootstrap_servers=localhost:9092)
csvfile = open("../data/user_log.csv","r")
reader = csv.reader(csvfile)

for line in reader:
gender = line[9]
if gender == gender:
continue
print(line[9])
time.sleep(0.1)
producer.send(sex,line[9].encode(utf8))

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_大数据_09

上述代码很简单,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为’sex’ 

consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer(result)
for msg in consumer:
print((msg.value).decode(utf8))

运行首先要运行producer.py,然后去运行consumer.py才可以正常展示和输出

如果报错:

报错原因:3.8版本中,async已经变成了关键字,所以导致不兼容
解决方案:执行 pip install kafka-python,就可以解决

pip install kafka-python

运行上面这条命令以后,这时,你会看到屏幕上会输出一行又一行的数字,类似下面的样子: 

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_大数据_10

如果有上述的输出,恭喜你,Python操作Kafka运行成功。接下来,第三部分将分析Spark Streaming如何处理Kafka的实时数据。


神笔马良之画龙点睛

Spark Streaming实时处理数据

Spark Streaming实时处理Kafka数据;

将处理后的结果发送给Kafka;

本项目在于实时统计每秒中男女生购物人数,而Spark Streaming接收的数据为1,1,0,2…,其中0代表女性,1代表男性,所以对于2或者null值,则不考虑。其实通过分析,可以发现这个就是典型的wordcount问题,而且是基于Spark流计算。女生的数量,即为0的个数,男生的数量,即为1的个数。

因此利用Spark Streaming接口reduceByKeyAndWindow,设置窗口大小为1,滑动步长为1,这样统计出的0和1的个数即为每秒男生女生的人数。

配置Spark开发Kafka环境

首先下载Spark连接Kafka的代码库。然后把下载的代码库放到目录

首先将:spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar这个文件直接复制粘贴在:/home/hadoop/spark/jars

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_大数据_11

 

然后在/home/hadoop/spark/jars目录下新建kafka目录,把/home/hadoop/kafka/kafka/libs下所有函数库复制到/home/hadoop/spark/jars/kafka目录下,命令如下:

cd /home/hadoop/spark/jars
mkdir kafka
cd kafka
cp /home/hadoop/kafka/kafka/libs/*  .

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_kafka_12

然后,修改 Spark 配置文件,命令如下

cd /home/hadoop/spark/conf
vim spark-env.sh

把 Kafka 相关 jar 包的路径信息增加到 spark-env.sh,修改后的 spark-env.sh 类似如下:

export SPARK_DIST_CLASSPATH=$classpath:/home/hadoop/spark/jars/kafka/*:/home/hadoop/kafka/kafka/libs/*

这就配置好了相关的参数

kafka_test.py

#!/home/hadoop/anaconda3/bin/python
from kafka import KafkaProducer
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
import json
import sys


def KafkaWordCount(zkQuorum, group, topics, numThreads):
spark_conf = SparkConf().setAppName("KafkaWordCount").set(spark.io.compresssion.codec, snappy)
sc = SparkContext(conf=spark_conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint(".")
# 这里表示把检查点文件写入分布式文件系统HDFS,所以要启动Hadoop
# ssc.checkpoint(".")
topicAry = topics.split(",")
# 将topic转换为hashmap形式,而python中字典就是一种hashmap
topicMap =
for topic in topicAry:
topicMap[topic] = numThreads
lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(lambda x : x[1])
words = lines.flatMap(lambda x : x.split(" "))
wordcount = words.map(lambda x : (x, 1)).reduceByKeyAndWindow((lambda x,y : x+y), (lambda x,y : x-y), 1, 1, 1)
wordcount.foreachRDD(lambda x : sendmsg(x))
ssc.start()
ssc.awaitTermination()


# 格式转化,将[["1", 3], ["0", 4], ["2", 3]]变为[1: 3, 0: 4, 2: 3],这样就不用修改第四个教程的代码了
def Get_dic(rdd_list):
res = []
for elm in rdd_list:
tmp = elm[0]: elm[1]
res.append(tmp)
return json.dumps(res)


def sendmsg(rdd):
if rdd.count != 0:
msg = Get_dic(rdd.collect())
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers=localhost:9092)
producer.send("result", msg.encode(utf8))
# 很重要,不然不会更新
producer.flush()


if __name__ == __main__:
# 输入的四个参数分别代表着
# 1.zkQuorum为zookeeper地址
# 2.group为消费者所在的组
# 3.topics该消费者所消费的topics
# 4.numThreads开启消费topic线程的个数
if (len(sys.argv) < 5):
print("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
exit(1)
zkQuorum = sys.argv[1]
group = sys.argv[2]
topics = sys.argv[3]
numThreads = int(sys.argv[4])
print(group, topics)
KafkaWordCount(zkQuorum, group, topics, numThreads)

上述代码注释已经也很清楚了,下面在简要说明下:
1. 首先按每秒的频率读取Kafka消息;
2. 然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;
3. 最后将上述结果封装成json发送给Kafka。

另外,需要注意,上面代码中有一行如下代码:

ssc.checkpoint(".") 

这行代码表示把检查点文件写入分布式文件系统HDFS,所以一定要事先启动Hadoop。如果没有启动Hadoop,则后面运行时会出现“拒绝连接”的错误提示。如果你还没有启动Hadoop,则可以现在在Ubuntu终端中,使用如下Shell命令启动Hadoop:

cd /home/hadoop/hadoop
./sbin/start-dfs.sh

建立pyspark项目

新建一个项目

cd /home/hadoop/spark
mkdir mycode
cp /home/hadoop/PycharmProjects/First/labproject/kafka_test.py /home/hadoop/spark/mycode

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_kafka_13

把这个加入到我们执行文件里面

/home/hadoop/spark/bin/spark-submit /home/hadoop/spark/mycode/kafka_test.py localhost:2181 1 sex 1

 

按照我们最初的想法,我们直接使用执行命令就可以执行了

./startup.sh

殊不知,就这样一步一步的走向深渊.......

下面是解决方法


华佗在世之妙手回春

1.首先我们发现执行之后,报错找不到这个文件路径,或者找不到这个文件,不存在这个文件

使用权限加入:chmod 777 startup.sh 或者 chmod +x startup.sh 给我们的执行文件加入可行性权限

2.接下来它依然报错,说无法找到,为什么呢?

注意要给你的Python加上可执行环境,我是使用的anaconda编译环境,anaconda比较的方便,推荐使用

sudo update-alternatives --install /usr/bin/python python /home/hadoop/anaconda3/bin/python 4

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_hadoop_14

3.版本不兼容导致的问题

根据报错的信息我们可以得出,我们的spark里面的有一个文件和我们之前加入的一个文件包有冲突,所以我们的解决方法是在删除这个包(net)

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_kafka_15

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_大数据_16

其他报错可以自己参考网络解法,有一个小小的建议,遇到报错之后,很多人都喜欢直接复制报错信息提交给百度君,但是!

不建议这样,因为每一步的过程可能别人和你不一样,或者你们的环境也不同,最正确的解决方法是,你自己阅读报错信息,安装报错来解决,可以参考CSDN里面解决方法。

再次执行

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_大数据_17

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_淘宝大数据_18

【全网详解】从0到1搭建双十一实时交易数据展示平台——Spark+Kafka构建实时分析系统_大数据_19

执行OK!到此为止,Spark Streaming程序编写完成,下篇文章将分析如何处理得到的最终结果。

结果展示之移花接木

做好了充分的准备工作,直接可以贴代码运行了!

web展示数据

数据是动态的,不断产生,因此利用Flask-SocketIO实时推送数据 socket.io.js实时获取数据 highlights.js展示数据

目录结构:

kafka-exp
├── app.py
├── static
│   └── js
│       ├── exporting.js
│       ├── highcharts.js
│       ├── jquery-3.1.1.min.js
│       ├── socket.io.js
│       └── socket.io.js.map
└── templates
     └── index.html

app.py(直接运行)

import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
#因为第一步骤安装好了flask,所以这里可以引用

app = Flask(__name__)
app.config[SECRET_KEY] = secret!
socketio = SocketIO(app)
thread = None
# 实例化一个consumer,接收topic为result的消息
consumer = KafkaConsumer(result)

# 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器
def background_thread():
girl = 0
boy = 0
for msg in consumer:
data_json = msg.value.decode(utf8)
data_list = json.loads(data_json)
for data in data_list:
if 0 in data.keys():
girl = data[0]
elif 1 in data.keys():
boy = data[1]
else:
continue
result = str(girl) + , + str(boy)
print(result)
socketio.emit(test_message,data:result)
socketio.sleep(1)


# 客户端发送connect事件时的处理函数
@socketio.on(test_connect)
def connect(message):
print(message)
global thread
if thread is None:
# 单独开启一个线程给客户端发送数据
thread = socketio.start_background_task(target=background_thread)
socketio.emit(connected, data: Connected)

# 通过访问http://127.0.0.1:5000/访问index.html
@app.route("/")
def handle_mes():
return render_template("index.html")

# main函数
if __name__ == __main__:
socketio.run(app,debug=True)

index.html

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>DashBoard</title>
<script src="static/js/socket.io.js"></script>
<script src="static/js/jquery-3.1.1.min.js"></script>
<script src="static/js/highcharts.js"></script>
<script src="static/js/exporting.js"></script>
<script type="text/javascript" charset="utf-8">
var socket = io.connect(http:// + document.domain + : + location.port);
socket.on(connect, function()
socket.emit(test_connect, data: I\\m connected!);
);

socket.on(test_message,function(message)
console.log(message);
var obj = eval(message);
var result = obj["data"].split(",");
$(#girl).html(result[0]);
$(#boy).html(result[1]);
);

socket.on(connected,function()
console.log(connected);
);

socket.on(disconnect, function ()
console.log(disconnect);
);
</script>
</head>
<body>
<div>
<b>Girl: </b><b id="girl"></b>
<b>Boy: </b><b id="boy"></b>
</div>
<div id="container" style="width: 600px;height:400px;"></div>

<script type="text/javascript">
$(document).ready(function ()
Highcharts.setOptions(
global:
useUTC: false

);

Highcharts.chart(container,
chart:
type: spline,
animation: Highcharts.svg, // dont animate in old IE
marginRight: 10,
events:
load: function ()

// set up the updating of the chart each second
var series1 = this.series[0];
var series2 = this.series[1];
setInterval(function ()
var x = (new Date()).getTime(), // current time
count1 = $(#girl).text();
y = parseInt(17.Flink--练习--双十一实时交易大屏需求数据实现步骤代码实现

从0到1搭建大数据平台之开篇

从0到1Flink的成长之路-Flink Action 综合案例

从0到1Flink的成长之路-Flink Action 综合案例

StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)

StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)