zookeeper + kafka + OpenRestry + Lua + Apache Druid实现日志收集与分析

Posted DM搬运工

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper + kafka + OpenRestry + Lua + Apache Druid实现日志收集与分析相关的知识,希望对你有一定的参考价值。

前言:本文基于springboot分布式系统实现日志的收集与分析,多用于电商项目的秒杀等热点数据。文章中的内容涉及zookeeper(注册中心)、kafka(队列)、Lua语言(日志收集)以及Apache Druid(实时分析)等热门技术。

一、zookeeper安装

注意:安装zookeeper前,需确保已安装jdk1.8_92以上到虚拟机!安装jdk参考上一篇文章 Linux安装jkd1.8

下载地址:https://zookeeper.apache.org/releases.html

1、上传安装包到指定目录下

将安装包上传到/usr/local/zookeeper目录下并解压:

2、创建data和logs文件夹用于存放数据和日志

[root@localhost apache-zookeeper-3.5.9-bin]#  mkdir data
[root@localhost apache-zookeeper-3.5.9-bin]#  mkdir logs


3、重命名配置文件并修改配置文件

注意:zookeeper默认加载zoo.cfg文件

重命名:

[root@localhost conf]# pwd
/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/conf
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg

修改zoo.cfg文件,修改数据和日志位置:

[root@localhost conf]# vim zoo.cfg 

4、启动zookeeper

[root@localhost bin]# ./zkServer.sh start


5、检查是否启动成功

[root@localhost bin]# ./zkServer.sh status

二、kafka安装

下载地址:http://kafka.apache.org/downloads

1、上传安装包到指定目录

将安装包上传到/usr/local/kafka目录下并解压:

[root@localhost kafka]# tar -xvf kafka_2.13-3.0.0.tgz 

2、创建日志存放目录

在解压目录/usr/local/kafka/kafka_2.13-3.0.0 下创建:

[root@localhost kafka_2.13-3.0.0]# pwd
/usr/local/kafka/kafka_2.13-3.0.0
[root@localhost kafka_2.13-3.0.0]# mkdir logs

3、修改配置文件

[root@localhost kafka_2.13-3.0.0]# vim config/server.properties 


参数介绍:

listeners:

  • localhost : 只监听本机的地址请求, 客户端也只能用 localhost 来请求
  • 127.0.0.1 : 同localhost, 在请求上可能有与区分 , 看client的请求吧 . 客户端也只能用127.0.0.1来请求
  • 192.168.0.1 : 建议不要用这个 , 局域网不一定是 192.168 段的.
  • 0.0.0.0 : 本机的所有地址都监听 , 包含 localhost , 127.0.0.1, 及不同网卡的所有ip地址 , 都监听 .

advertised.listeners:

  • 这个是对外提供的地址 , 当client请求到kafka时, 会分发这个地址.
  • 有三个地方用到: 集群内其他的broker,生产者,消费者
  • 可以不填 , 不填就默认用 listeners 的地址.

4、启动kafka并验证是否成功

1、启动

[root@localhost kafka_2.13-3.0.0]# bin/kafka-server-start.sh config/server.properties 

2、验证

[root@localhost kafka_2.13-3.0.0]# ps -ef |grep kafka

出现如下内容表示启动成功!

kafka快速入门请参考文档:https://kafka.apache.org/quickstart

1)下面来简单创建一个主题(即队列 itemaccess ),为下文中的日志收集做准备:

[root@localhost kafka_2.13-3.0.0]# pwd
/usr/local/kafka/kafka_2.13-3.0.0
#创建主题
[root@localhost kafka_2.13-3.0.0]# bin/kafka-topics.sh --bootstrap-server 192.168.8.116:9092 --create --topic itemaccess --partitions 2 --replication-factor 1
#查看已创建主题
[root@localhost kafka_2.13-3.0.0]# bin/kafka-topics.sh --bootstrap-server 192.168.8.116:9092 --list
itemaccess


2)启动消费端

[root@localhost kafka_2.13-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.116:9092 --topic itemaccess

三、日志收集

1、OpenRestry安装

OpenResty 是一个基于 NginxLua高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。OpenResty 通过lua脚本扩展nginx功能,可提供负载均衡、请求路由、安全认证、服务鉴权、流量控制与日志监控等服务。

关于OpenRestry的学习,可以参考:http://openresty.org/cn/

1.1、下载并解压安装包

在目录/tmp下下载安装包:

[root@localhost tmp]# wget https://openresty.org/download/openresty-1.11.2.5.tar.gz

解压:

[root@localhost tmp]# tar -xvf https://openresty.org/download/openresty-1.11.2.5.tar.gz

1.2、安装

进入到解压目录进行安装,依次执行以下命令:

[root@localhost tmp]# cd openresty-1.11.2.5/
[root@localhost openresty-1.11.2.5]# ./configure --prefix=/usr/local/openresty --with-luajit --without-http_redis2_module --with-http_stub_status_module --with-http_v2_module --with-http_gzip_static_module --with-http_sub_module
[root@localhost openresty-1.11.2.5]# make
[root@localhost openresty-1.11.2.5]# make install

安装完成后,软件会安装到/usr/local/openresty,这里面会包含nginx。

1.3、配置环境变量并刷新配置

配置:

[root@localhost openresty-1.11.2.5]# vim /etc/profile


刷新:

[root@localhost openresty-1.11.2.5]# source /etc/profile

1.4、测试是否可用

1)在/usr/local目录下创建web/items目录:

[root@localhost local]# pwd
/usr/local
[root@localhost local]# mkdir -p web/items

2)下载百度网页到web目录下:

[root@localhost local]# cd web/items
[root@localhost items]# pwd
/usr/local/web/items
[root@localhost items]# wget www.baidu.com

3)修改/usr/local/openresty/nginx/conf/下的nginx.conf配置文件

[root@localhost conf]# pwd
/usr/local/openresty/nginx/conf
[root@localhost conf]# vim nginx.conf

加入如下内容:

4)启动nginx,并访问测试

[root@localhost conf]# pwd
/usr/local/openresty/nginx/conf
[root@localhost conf]# nginx

访问:http://192.168.8.116:8081/items/index.html

2、Lua日志收集

使用Lua实现日志收集,并向Kafka发送访问的详情页信息,此时我们需要安装一个依赖组件lua-restry-kafka。关于lua-restry-kafka的下载和使用,可以参考https://github.com/doujiang24/lua-resty-kafka


日志收集流程:

​ 用户请求/web/items/1.html,进入到nginx第1个location中,在该location中向Kafka发送请求日志信息,并将请求中的/web去掉,跳转到另一个location中,并查找本地文件,这样既可以完成日志收集,也能完成文件的访问。

2.1、解压

将下载好的lua-resty-kafka-master.zip文件上传到/usr/local/openrestry目录下,并解压。
1)安装unzip命令

[root@localhost openresty]# yum install -y unzip

2)解压

[root@localhost openresty]# pwd
/usr/local/openresty
[root@localhost openresty]# ls
bin  COPYRIGHT  luajit  lualib  lua-resty-kafka-master.zip  nginx  openssl111  pcre  pod  resty.index  site  zlib
[root@localhost openresty]# unzip lua-resty-kafka-master.zip 

2.2、修改nginx的配置

修改nginx.conf,在配置文件中指定lua-resty-kafka的库文件位置:

[root@localhost conf]# pwd
/usr/local/openresty/nginx/conf
[root@localhost conf]# vim nginx.conf

2.3、日志收集

用户访问页面的时候,需要实现日志收集,日志收集采用Lua将当前访问信息发布到Kafka中,因此这里要实现Kafka消息生产者。

我们定义一个消息格式:

{
  "actime": "2021-11-10 16:25:30",
  "uri": "http://192.168.8.116/items/index.html",
  "ip": "119.123.33.231",
  "token": "Bearer JAVAITCAST"
}

1)生产者脚本
在/usr/local/openresty/nginx/lua目录下创建一个lua脚本items-access.lua:

[root@localhost lua]# pwd
/usr/local/openresty/nginx/lua
[root@localhost lua]# vim items-access.lua

脚本内容如下:

--引入json解析库
local cjson = require("cjson")
--kafka依赖库
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
--配置kafka的链接地址
local broker_list = {
      { host = "192.168.8.116", port = 9092 }
}
--创建生产者
local pro = producer:new(broker_list,{ producer_type="async"})

--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"

--定义消息内容
local logjson = {}
logjson["uri"]=ngx.var.uri
logjson["ip"]=ip
logjson["token"]="Bearer TEST"
logjson["actime"]=os.date("%Y-%m-%d %H:%m:%S")

--发送消息
local offset, err = pro:send("itemaccess", nil, cjson.encode(logjson))

--去掉访问前缀
local uri = ngx.var.uri
uri = string.gsub(uri,"/web","")
--页面跳转
ngx.exec(uri)

2.4、修改nginx配置

[root@localhost conf]# pwd
/usr/local/openresty/nginx/conf
[root@localhost conf]# vim nginx.conf


重启nginx!

2.5、日志收集测试

请求地址:http://192.168.8.116:8081/web/items/index.html

四、 Apache Druid日志实时分析

Apache Druid 是一个分布式的、支持实时多维 OLAP 分析的数据处理系统。它既支持高速的数据实时摄入,也支持实时且灵活的多维数据分析查询。因此 Druid 最常用的场景是大数据背景下、灵活快速的多维 OLAP 分析。 另外,Druid 还有一个关键的特点:它支持根据时间戳对数据进行预聚合摄入和聚合分析,因此也有用户经常在有时序数据处理分析的场景中用到它。

注:需要JDK:java8(8u92+),同时需要笔记本大约 4 个 CPU 和 16 G的内存来运行!

1、下载安装包

下载地址:https://druid.apache.org/downloads.html
快速入门:https://druid.apache.org/docs/latest/tutorials/index.html

2、安装

1)将文件上传至/usr/local/apache-druid目录下:

2)解压安装

[root@localhost apache-druid]# tar -xvf apache-druid-0.22.0-bin.tar.gz 

3、修改 Apache Druid自带的zookeeper的端口

问题:在单机部署的时候会和原先安装的zookeeper端口2181冲突,如果两个一起启动,那就就需要修改Druid或者zookeeper端口为2182。

1)查看执行文件

[root@localhost bin]# pwd
/usr/local/apache-druid/apache-druid-0.22.0/bin
[root@localhost bin]# vim start-micro-quickstart 


2)看到显示加载的conf文件是micro-quickstart.conf,直接查看该conf文件

[root@localhost apache-druid-0.22.0]# pwd
/usr/local/apache-druid/apache-druid-0.22.0
[root@localhost apache-druid-0.22.0]# vim conf/supervise/single-server/micro-quickstart.conf


发现先去端口验证verify bin/verify-default-ports,然后执行in/run-zk conf。

3) 修改bin/verify-default-ports文件中的端口

[root@localhost apache-druid-0.22.0]# pwd
/usr/local/apache-druid/apache-druid-0.22.0
[root@localhost apache-druid-0.22.0]# vim bin/verify-default-ports


将@ports数组中的2181改为2182。

4)将zookeeper中的端口改为2182,修改zoo.cfg文件

[root@localhost zk]# pwd
/usr/local/apache-druid/apache-druid-0.22.0/conf/zk
[root@localhost zk]# ls
jvm.config  log4j2.xml  zoo.cfg
[root@localhost zk]# vim zoo.cfg 

4、启动单机版Apache Druid

[root@localhost apache-druid-0.22.0]# ./bin/start-micro-quickstart

启动后访问:http://192.168.3.10:8888/,默认的端口是8888

5、测试

5.1、离线数据导入

  1. 点击Load data->Local disk->Connect data

    2)导入数据

我们要导入的数据在/tmp/apache-druid-0.21.1/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz(在安装目录下),需要把该文件的相对路径填写到右边表单中,再点击Apply。

3) 解析数据
在上一个步骤上点击Next:Parse data

4) 解析时间
在上一个步骤上点击Next: Parse time,Apache Druid要求每条数据都有一个time列,如果我们导入的数据没有该列,Apache Druid会自动创建该列!

5) 数据分区设置
点击下一步一直到Partition,Segment granularity选择day

  • Segment granularity:分片文件每个segment包含的时间戳范围
  • Partitioning type:分区类型
  • Max rows per segment:用于分片。确定每个段中的行数。

6) 设置数据源

将默认名称从 更改 wikiticker-2015-09-12-sampled为wikipedia

7)提交数据

5.2、实时数据摄入

参考地址:https://druid.apache.org/docs/latest/tutorials/tutorial-kafka.html
1)加载数据

2)配置Kafka源

topic设置为上文kafka所创建的itemaccess。

3)配置数据源名字

其他的步骤和之前文件导入一样。


查询:

6、JDBC查询druid

1、导入依赖

<dependency>
    <groupId>org.apache.calcite.avatica</groupId>
    <artifactId>avatica-core</artifactId>
    <version>1.15.0</version>
</dependency>

2、测试

public static void main(String[] args) throws Exception{
    //链接地址
    String url = "jdbc:avatica:remote:url=http://192.168.8.116:8082/druid/v2/sql/avatica/";
    AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url);

    //SQL语句,查询2021-11-10 21:50:30之后的访问uri和访问数量
    String sql="SELECT uri,count(*) AS \\"viewcount\\" FROM(SELECT * FROM \\"itemlogs\\" WHERE __time>'2021-11-10 21:50:30' ORDER BY __time DESC) GROUP BY uri LIMIT 100";

    //创建Statment
    AvaticaStatement statement = connection.createStatement();

    //执行查询
    ResultSet resultSet = statement.executeQuery(sql);

    while (resultSet.next()) {
        //获取uri
        String uri = resultSet.getString("uri");
        String viewcount = resultSet.getString("viewcount");
        System.out.println(uri+"--------->"+viewcount);
    }
}

7、拓展

Druid的时区和国内时区不一致,会比我们的少8个小时,我们需要修改配置文件,批量将时间+8,代码如下:

[root@k8s-master1 apache-druid-0.21.1]# sed -i "s/Duser.timezone=UTC/Duser.timezone=UTC+8/g" `grep Duser.timezone=UTC -rl ./`

五、springboot整合druid

采用elastic-job定时器来实时查询热点数据。

1、导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.15</version>
</dependency>

<!-- ElasticJobAutoConfiguration自动配置类作用-->
<dependency>
    <groupId>com.github.kuhn-he</groupId>
    <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
    <version>2.1.5</version>
</dependency>

<dependency>
    <groupId>org.apache.calcite.avatica</groupId>
    <artifactId>avatica-core</artifactId>
    <version>1.15.0</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.12</version>
</dependency>
<!-- redis 使用-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.4.RELEASE</version>
</dependency>

<!--分布式锁-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.11.0</version>
</dependency>

2、yaml配置

server:
  port: 18084
spring:
  application:
    name: monitor
  datasource:
    driver-class-name: org.apache.calcite.avatica.remote.Driver
    url: jdbc:avatica:remote:url=http://192.168.3.10:8082/druid/v2/sql/avatica/
    type: com.alibaba.druid.pool.DruidDataSource
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: 192.168.3.10:8848
      discovery:
        #Nacos的注册地址
        server-addr: 192.168.3.10:8848
  redis:
    cluster:
      nodes:
        - 192.168.3.10:7001
        - 192.168.3.10:7002
        - 192.168.3.10:7003
        - 192.168.3.10:7004
        - 192.168.3.10:7005
        - 192.168.3.10:7006

#elasticjob
elaticjob:
  zookeeper:
    server-lists: 192.168.3.10:2181
    namespace: monitortask

#Druid
druidurl: jdbc:avatica:remote:url=http://192.168.3.10:8082/druid/v2/sql/avatica/

logging:
  pattern:
    console: "%msg%n"

3、热点数据查询

import com.alibaba.druid.pool.DruidDataSource;
import org.apache.calcite.avatica.AvaticaConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/*****
 * 热点数据查询
 ****/
@Component
public class MonitorItemsAccess {

    @Value("${druidurl}")
    private String druidurl;

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private DruidDataSource dataSource;

    /******
     * 定义热点数据标准:
     *      1.某一件商品访问量>N
     *      2.最近N小时
     */
    public List<String> loadData() throws Exception{
        //获取连接对象
        //Connection connection = (AvaticaConnection) DriverManager.getConnection(druidurl);
        Connection connection =dataSource.getConnection();
        //Statement
        Statement statement = connection以上是关于zookeeper + kafka + OpenRestry + Lua + Apache Druid实现日志收集与分析的主要内容,如果未能解决你的问题,请参考以下文章

启动zookeeper和kafka时 kafka无法启动或者闪退

zookeeper集群+kafka集群 部署

为kafka配置zookeeper

Kafka学习之路 Kafka在zookeeper中的存储

zookeeper集群+kafka集群 部署

kafka zookeeper 集群