zookeeper + kafka + OpenRestry + Lua + Apache Druid实现日志收集与分析
Posted DM搬运工
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper + kafka + OpenRestry + Lua + Apache Druid实现日志收集与分析相关的知识,希望对你有一定的参考价值。
前言:本文基于springboot分布式系统实现日志的收集与分析,多用于电商项目的秒杀等热点数据。文章中的内容涉及zookeeper(注册中心)、kafka(队列)、Lua语言(日志收集)以及Apache Druid(实时分析)等热门技术。
一、zookeeper安装
注意:安装zookeeper前,需确保已安装jdk1.8_92以上到虚拟机!安装jdk参考上一篇文章 Linux安装jkd1.8。
下载地址:https://zookeeper.apache.org/releases.html
1、上传安装包到指定目录下
将安装包上传到/usr/local/zookeeper目录下并解压:
2、创建data和logs文件夹用于存放数据和日志
[root@localhost apache-zookeeper-3.5.9-bin]# mkdir data
[root@localhost apache-zookeeper-3.5.9-bin]# mkdir logs
3、重命名配置文件并修改配置文件
注意:zookeeper默认加载zoo.cfg文件
重命名:
[root@localhost conf]# pwd
/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/conf
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg
修改zoo.cfg文件,修改数据和日志位置:
[root@localhost conf]# vim zoo.cfg
4、启动zookeeper
[root@localhost bin]# ./zkServer.sh start
5、检查是否启动成功
[root@localhost bin]# ./zkServer.sh status
二、kafka安装
下载地址:http://kafka.apache.org/downloads
1、上传安装包到指定目录
将安装包上传到/usr/local/kafka目录下并解压:
[root@localhost kafka]# tar -xvf kafka_2.13-3.0.0.tgz
2、创建日志存放目录
在解压目录/usr/local/kafka/kafka_2.13-3.0.0 下创建:
[root@localhost kafka_2.13-3.0.0]# pwd
/usr/local/kafka/kafka_2.13-3.0.0
[root@localhost kafka_2.13-3.0.0]# mkdir logs
3、修改配置文件
[root@localhost kafka_2.13-3.0.0]# vim config/server.properties
参数介绍:
listeners:
- localhost : 只监听本机的地址请求, 客户端也只能用 localhost 来请求
- 127.0.0.1 : 同localhost, 在请求上可能有与区分 , 看client的请求吧 . 客户端也只能用127.0.0.1来请求
- 192.168.0.1 : 建议不要用这个 , 局域网不一定是 192.168 段的.
- 0.0.0.0 : 本机的所有地址都监听 , 包含 localhost , 127.0.0.1, 及不同网卡的所有ip地址 , 都监听 .
advertised.listeners:
- 这个是对外提供的地址 , 当client请求到kafka时, 会分发这个地址.
- 有三个地方用到: 集群内其他的broker,生产者,消费者
- 可以不填 , 不填就默认用 listeners 的地址.
4、启动kafka并验证是否成功
1、启动
[root@localhost kafka_2.13-3.0.0]# bin/kafka-server-start.sh config/server.properties
2、验证
[root@localhost kafka_2.13-3.0.0]# ps -ef |grep kafka
出现如下内容表示启动成功!
kafka快速入门请参考文档:https://kafka.apache.org/quickstart
1)下面来简单创建一个主题(即队列 itemaccess ),为下文中的日志收集做准备:
[root@localhost kafka_2.13-3.0.0]# pwd
/usr/local/kafka/kafka_2.13-3.0.0
#创建主题
[root@localhost kafka_2.13-3.0.0]# bin/kafka-topics.sh --bootstrap-server 192.168.8.116:9092 --create --topic itemaccess --partitions 2 --replication-factor 1
#查看已创建主题
[root@localhost kafka_2.13-3.0.0]# bin/kafka-topics.sh --bootstrap-server 192.168.8.116:9092 --list
itemaccess
2)启动消费端
[root@localhost kafka_2.13-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.116:9092 --topic itemaccess
三、日志收集
1、OpenRestry安装
OpenResty 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。OpenResty 通过lua脚本扩展nginx功能,可提供负载均衡、请求路由、安全认证、服务鉴权、流量控制与日志监控等服务。
关于OpenRestry的学习,可以参考:http://openresty.org/cn/
1.1、下载并解压安装包
在目录/tmp下下载安装包:
[root@localhost tmp]# wget https://openresty.org/download/openresty-1.11.2.5.tar.gz
解压:
[root@localhost tmp]# tar -xvf https://openresty.org/download/openresty-1.11.2.5.tar.gz
1.2、安装
进入到解压目录进行安装,依次执行以下命令:
[root@localhost tmp]# cd openresty-1.11.2.5/
[root@localhost openresty-1.11.2.5]# ./configure --prefix=/usr/local/openresty --with-luajit --without-http_redis2_module --with-http_stub_status_module --with-http_v2_module --with-http_gzip_static_module --with-http_sub_module
[root@localhost openresty-1.11.2.5]# make
[root@localhost openresty-1.11.2.5]# make install
安装完成后,软件会安装到/usr/local/openresty,这里面会包含nginx。
1.3、配置环境变量并刷新配置
配置:
[root@localhost openresty-1.11.2.5]# vim /etc/profile
刷新:
[root@localhost openresty-1.11.2.5]# source /etc/profile
1.4、测试是否可用
1)在/usr/local目录下创建web/items目录:
[root@localhost local]# pwd
/usr/local
[root@localhost local]# mkdir -p web/items
2)下载百度网页到web目录下:
[root@localhost local]# cd web/items
[root@localhost items]# pwd
/usr/local/web/items
[root@localhost items]# wget www.baidu.com
3)修改/usr/local/openresty/nginx/conf/下的nginx.conf配置文件
[root@localhost conf]# pwd
/usr/local/openresty/nginx/conf
[root@localhost conf]# vim nginx.conf
加入如下内容:
4)启动nginx,并访问测试
[root@localhost conf]# pwd
/usr/local/openresty/nginx/conf
[root@localhost conf]# nginx
访问:http://192.168.8.116:8081/items/index.html
2、Lua日志收集
使用Lua实现日志收集,并向Kafka发送访问的详情页信息,此时我们需要安装一个依赖组件lua-restry-kafka。关于lua-restry-kafka的下载和使用,可以参考https://github.com/doujiang24/lua-resty-kafka
日志收集流程:
用户请求/web/items/1.html,进入到nginx第1个location中,在该location中向Kafka发送请求日志信息,并将请求中的/web去掉,跳转到另一个location中,并查找本地文件,这样既可以完成日志收集,也能完成文件的访问。
2.1、解压
将下载好的lua-resty-kafka-master.zip文件上传到/usr/local/openrestry目录下,并解压。
1)安装unzip命令
[root@localhost openresty]# yum install -y unzip
2)解压
[root@localhost openresty]# pwd
/usr/local/openresty
[root@localhost openresty]# ls
bin COPYRIGHT luajit lualib lua-resty-kafka-master.zip nginx openssl111 pcre pod resty.index site zlib
[root@localhost openresty]# unzip lua-resty-kafka-master.zip
2.2、修改nginx的配置
修改nginx.conf,在配置文件中指定lua-resty-kafka的库文件位置:
[root@localhost conf]# pwd
/usr/local/openresty/nginx/conf
[root@localhost conf]# vim nginx.conf
2.3、日志收集
用户访问页面的时候,需要实现日志收集,日志收集采用Lua将当前访问信息发布到Kafka中,因此这里要实现Kafka消息生产者。
我们定义一个消息格式:
{
"actime": "2021-11-10 16:25:30",
"uri": "http://192.168.8.116/items/index.html",
"ip": "119.123.33.231",
"token": "Bearer JAVAITCAST"
}
1)生产者脚本
在/usr/local/openresty/nginx/lua目录下创建一个lua脚本items-access.lua:
[root@localhost lua]# pwd
/usr/local/openresty/nginx/lua
[root@localhost lua]# vim items-access.lua
脚本内容如下:
--引入json解析库
local cjson = require("cjson")
--kafka依赖库
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
--配置kafka的链接地址
local broker_list = {
{ host = "192.168.8.116", port = 9092 }
}
--创建生产者
local pro = producer:new(broker_list,{ producer_type="async"})
--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
--定义消息内容
local logjson = {}
logjson["uri"]=ngx.var.uri
logjson["ip"]=ip
logjson["token"]="Bearer TEST"
logjson["actime"]=os.date("%Y-%m-%d %H:%m:%S")
--发送消息
local offset, err = pro:send("itemaccess", nil, cjson.encode(logjson))
--去掉访问前缀
local uri = ngx.var.uri
uri = string.gsub(uri,"/web","")
--页面跳转
ngx.exec(uri)
2.4、修改nginx配置
[root@localhost conf]# pwd
/usr/local/openresty/nginx/conf
[root@localhost conf]# vim nginx.conf
重启nginx!
2.5、日志收集测试
请求地址:http://192.168.8.116:8081/web/items/index.html
四、 Apache Druid日志实时分析
Apache Druid 是一个分布式的、支持实时多维 OLAP 分析的数据处理系统。它既支持高速的数据实时摄入,也支持实时且灵活的多维数据分析查询。因此 Druid 最常用的场景是大数据背景下、灵活快速的多维 OLAP 分析。 另外,Druid 还有一个关键的特点:它支持根据时间戳对数据进行预聚合摄入和聚合分析,因此也有用户经常在有时序数据处理分析的场景中用到它。
注:需要JDK:java8(8u92+),同时需要笔记本大约 4 个 CPU 和 16 G的内存来运行!
1、下载安装包
下载地址:https://druid.apache.org/downloads.html
快速入门:https://druid.apache.org/docs/latest/tutorials/index.html
2、安装
1)将文件上传至/usr/local/apache-druid目录下:
2)解压安装
[root@localhost apache-druid]# tar -xvf apache-druid-0.22.0-bin.tar.gz
3、修改 Apache Druid自带的zookeeper的端口
问题:在单机部署的时候会和原先安装的zookeeper端口2181冲突,如果两个一起启动,那就就需要修改Druid或者zookeeper端口为2182。
1)查看执行文件
[root@localhost bin]# pwd
/usr/local/apache-druid/apache-druid-0.22.0/bin
[root@localhost bin]# vim start-micro-quickstart
2)看到显示加载的conf文件是micro-quickstart.conf,直接查看该conf文件
[root@localhost apache-druid-0.22.0]# pwd
/usr/local/apache-druid/apache-druid-0.22.0
[root@localhost apache-druid-0.22.0]# vim conf/supervise/single-server/micro-quickstart.conf
发现先去端口验证verify bin/verify-default-ports,然后执行in/run-zk conf。
3) 修改bin/verify-default-ports文件中的端口
[root@localhost apache-druid-0.22.0]# pwd
/usr/local/apache-druid/apache-druid-0.22.0
[root@localhost apache-druid-0.22.0]# vim bin/verify-default-ports
将@ports数组中的2181改为2182。
4)将zookeeper中的端口改为2182,修改zoo.cfg文件
[root@localhost zk]# pwd
/usr/local/apache-druid/apache-druid-0.22.0/conf/zk
[root@localhost zk]# ls
jvm.config log4j2.xml zoo.cfg
[root@localhost zk]# vim zoo.cfg
4、启动单机版Apache Druid
[root@localhost apache-druid-0.22.0]# ./bin/start-micro-quickstart
启动后访问:http://192.168.3.10:8888/,默认的端口是8888
5、测试
5.1、离线数据导入
- 点击Load data->Local disk->Connect data
2)导入数据
我们要导入的数据在/tmp/apache-druid-0.21.1/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz(在安装目录下),需要把该文件的相对路径填写到右边表单中,再点击Apply。
3) 解析数据
在上一个步骤上点击Next:Parse data
4) 解析时间
在上一个步骤上点击Next: Parse time,Apache Druid要求每条数据都有一个time列,如果我们导入的数据没有该列,Apache Druid会自动创建该列!
5) 数据分区设置
点击下一步一直到Partition,Segment granularity选择day
- Segment granularity:分片文件每个segment包含的时间戳范围
- Partitioning type:分区类型
- Max rows per segment:用于分片。确定每个段中的行数。
6) 设置数据源
将默认名称从 更改 wikiticker-2015-09-12-sampled为wikipedia
7)提交数据
5.2、实时数据摄入
参考地址:https://druid.apache.org/docs/latest/tutorials/tutorial-kafka.html
1)加载数据
2)配置Kafka源
topic设置为上文kafka所创建的itemaccess。
3)配置数据源名字
其他的步骤和之前文件导入一样。
查询:
6、JDBC查询druid
1、导入依赖
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.15.0</version>
</dependency>
2、测试
public static void main(String[] args) throws Exception{
//链接地址
String url = "jdbc:avatica:remote:url=http://192.168.8.116:8082/druid/v2/sql/avatica/";
AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url);
//SQL语句,查询2021-11-10 21:50:30之后的访问uri和访问数量
String sql="SELECT uri,count(*) AS \\"viewcount\\" FROM(SELECT * FROM \\"itemlogs\\" WHERE __time>'2021-11-10 21:50:30' ORDER BY __time DESC) GROUP BY uri LIMIT 100";
//创建Statment
AvaticaStatement statement = connection.createStatement();
//执行查询
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
//获取uri
String uri = resultSet.getString("uri");
String viewcount = resultSet.getString("viewcount");
System.out.println(uri+"--------->"+viewcount);
}
}
7、拓展
Druid的时区和国内时区不一致,会比我们的少8个小时,我们需要修改配置文件,批量将时间+8,代码如下:
[root@k8s-master1 apache-druid-0.21.1]# sed -i "s/Duser.timezone=UTC/Duser.timezone=UTC+8/g" `grep Duser.timezone=UTC -rl ./`
五、springboot整合druid
采用elastic-job定时器来实时查询热点数据。
1、导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<!-- ElasticJobAutoConfiguration自动配置类作用-->
<dependency>
<groupId>com.github.kuhn-he</groupId>
<artifactId>elastic-job-lite-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.12</version>
</dependency>
<!-- redis 使用-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
<!--分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.0</version>
</dependency>
2、yaml配置
server:
port: 18084
spring:
application:
name: monitor
datasource:
driver-class-name: org.apache.calcite.avatica.remote.Driver
url: jdbc:avatica:remote:url=http://192.168.3.10:8082/druid/v2/sql/avatica/
type: com.alibaba.druid.pool.DruidDataSource
cloud:
nacos:
config:
file-extension: yaml
server-addr: 192.168.3.10:8848
discovery:
#Nacos的注册地址
server-addr: 192.168.3.10:8848
redis:
cluster:
nodes:
- 192.168.3.10:7001
- 192.168.3.10:7002
- 192.168.3.10:7003
- 192.168.3.10:7004
- 192.168.3.10:7005
- 192.168.3.10:7006
#elasticjob
elaticjob:
zookeeper:
server-lists: 192.168.3.10:2181
namespace: monitortask
#Druid
druidurl: jdbc:avatica:remote:url=http://192.168.3.10:8082/druid/v2/sql/avatica/
logging:
pattern:
console: "%msg%n"
3、热点数据查询
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.calcite.avatica.AvaticaConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/*****
* 热点数据查询
****/
@Component
public class MonitorItemsAccess {
@Value("${druidurl}")
private String druidurl;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private DruidDataSource dataSource;
/******
* 定义热点数据标准:
* 1.某一件商品访问量>N
* 2.最近N小时
*/
public List<String> loadData() throws Exception{
//获取连接对象
//Connection connection = (AvaticaConnection) DriverManager.getConnection(druidurl);
Connection connection =dataSource.getConnection();
//Statement
Statement statement = connection以上是关于zookeeper + kafka + OpenRestry + Lua + Apache Druid实现日志收集与分析的主要内容,如果未能解决你的问题,请参考以下文章