基于kafka的日志收集分析平台

Posted Wangsh@

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于kafka的日志收集分析平台相关的知识,希望对你有一定的参考价值。

目录

基于kafka的日志收集分析平台架构图

 数据走向流程

一、项目目的

二、项目环境

三、项目步骤

准备好3台虚拟机搭建nginx集群

配置好三台nginx机器的静态ip地址,防止dhcp模式动态获得ip地址对我们服务器造成影响

三台机器都配置好dns

 dns解析顺序:

修改主机名

每一台机器上面写好域名解析

安装基本软件,解决依赖关系

安装时间同步服务

关闭防火墙

关闭selinux

nginx搭建

nginx配置文件修改

新建我们的配置文件

语法检测,检测配置文件语法是否正确

使用三台虚拟机搭建kafka和zookeeper集群

安装java和kafka

配置kafka

配置zookeeper

启动kafka

启动zookeeper

创建一个topic来测试kafka

创建topic : 

创建生产者

创建消费者

filebeat部署

yum安装filebeat

测试filebeat能否生产数据

启动filebeat服务

 接下来用kafka自带的消费者程序来测试一下我们能否消费到filebeat生产的nginxlog主题里面的数据

 可以到filebeat的记录数据的文件里面看一下消费者有没有成功消费到数据。(/var/lib/filebeat/registry/filebeat/)

 编写python脚本,模拟消费者消费数据,然后将所需字段提取出来整理后放入数据库里面

日志收集平台详细架构图:


基于kafka的日志收集分析平台架构图

 数据走向流程

一、项目目的

        主要是为了模拟企业在大数据背景下的日志收集、存储,分析,消费等流程。

二、项目环境

Windows10机器(测试用)、Linux(centos7)、nginx(1.20.1)、Filebeat(7.17.5)、kafka(1.12)、zookeeper(3.6.3)、Pycharm2020.3、mysql(5.7.34)

三、项目步骤

准备好3台虚拟机搭建nginx集群

配置好三台nginx机器的静态ip地址,防止dhcp模式动态获得ip地址对我们服务器造成影响

三台机器都配置好dns

 dns解析顺序:

      1、浏览器的缓存
      2、本地hosts文件  --linux(/etc/hosts)
      3、找本地域名服务器  -- linux(/etc/resolv.conf)

修改主机名

[root@nginx-kafka01 /]# cat /etc/hostname 
nginx-kafka01
[root@nginx-kafka02 ~]# cat /etc/hostname 
nginx-kafka02
[root@nginx-kafka03 ~]# cat /etc/hostname
nginx-kafka03

每一台机器上面写好域名解析

[root@nginx-kafka01 /]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.44.181 nginx-kafka01
192.168.44.182 nginx-kafka02
192.168.44.183 nginx-kafka03

安装基本软件,解决依赖关系

   yum install wget lsof vim -y

安装时间同步服务


    yum -y install chrony

设置开机自启,然后开机服务

	vim /etc/selinux/config
	SELINUX=disabled
systemctl enable chronyd
systemctl start chronyd

关闭防火墙

    [root@nginx-kafka01 ~]# systemctl stop firewalld
    [root@nginx-kafka01 ~]# systemctl disable firewalld

关闭selinux

selinux是linux里面的一个安全子系统,里面有许多关于安全的规则,很麻烦,会影响项目运行。

	vim /etc/selinux/config
	SELINUX=disabled

        selinux关闭 需要重启机器才能生效,可以看到selinux处于禁用状态

[root@nginx-kafka01 /]# getenforce
Disabled

nginx搭建

安装好epel源,本次nginx安装使用yum安装,以一台机器示例:

yum install epel-release -y
yum install  nginx -y

设置开机自启

systemctl enable nginx

启动nginx服务

systemctl start nginx

查看nginx是否启动成功

[root@nginx-kafka01 /]# ps aux | grep nginx
root       2098  0.0  0.0  40056   984 ?        Ss   7月24   0:00 nginx: master process /usr/sbinnginx
nginx      2179  0.0  0.0  40060  1180 ?        S    7月24   0:00 nginx: worker process

nginx配置文件修改

vim   nginx.conf

将 
   listen       80 default_server;
修改成:
   listen       80;

新建我们的配置文件

vim  /etc/nginx/conf.d/sc.conf

server 
    listen 80 default_server;
    server_name  www.sc.com;

    root         /usr/share/nginx/html;

    access_log  /var/log/nginx/sc/access.log main;

    location  / 

    

语法检测,检测配置文件语法是否正确

[root@nginx-kafka01 html]# nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: [emerg] open() "/var/log/nginx/sc/access.log" failed (2: No such file or directory)
nginx: configuration file /etc/nginx/nginx.conf test failed
[root@nginx-kafka01 html]# mkdir /var/log/nginx/sc
[root@nginx-kafka01 html]# nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful

#重新加载nginx
nginx -s  reload

使用三台虚拟机搭建kafka和zookeeper集群

以一台机器示例

安装java和kafka

yum install java wget  -y
wget   https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz 

解压缩

tar  xf  kafka_2.12-2.8.1.tgz

配置kafka

修改config /server.properties:

设置broker节点,这代表这台kafka机器

broker.id=1
zookeeper.connect=192.168.44.181:2181,192.168.44.182:2181,192.168.44.183:2181

配置zookeeper

进入安装zookeeper的目录

将配置文件copy一份然后改名为zoo.cfg添加如下三行

server.1=192.168.0.94:3888:4888
server.2=192.168.0.95:3888:4888
server.3=192.168.0.96:3888:4888

3888和4888都是端口  一个用于数据传输,一个用于检验存活性和选举

创建/tmp/zookeeper目录 ,在目录中添加myid文件,文件内容就是本机指定的zookeeper id内容
如:在192.168.44.181机器上
echo 1 > /tmp/zookeeper/myid

myid里面的id号要和broker节点号一致,分别设置三台机器。

查看三台zookeeper的leader和follower情况

可以看到我设置的kafka02是leader,kafka01和kafka03是follower

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties

启动zookeeper

[root@nginx-kafka01 bin]# ./zkCli.sh 

 此时我们应该看到三个brokers的id

[zk: localhost:2181(CONNECTED) 3] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, sc, zookeeper]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[1, 2, 3]

创建broker

[zk: localhost:2181(CONNECTED) 3] create /sc/yy
Created /sc/yy
[zk: localhost:2181(CONNECTED) 4] ls /sc
[page, xx, yy]
[zk: localhost:2181(CONNECTED) 5] set /sc/yy 90
[zk: localhost:2181(CONNECTED) 6] get /sc/yy
90

创建一个topic来测试kafka

创建topic : 

bin/kafka-topics.sh --create --zookeeper 192.168.44.181:2181 --replication-factor 1 --partitions 1 --topic test

 效果图:

创建生产者

bin/kafka-console-producer.sh --broker-list 192.168.44.181:9092 --topic test

创建消费者

bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.181:9092 --topic test

消费成功效果图:

消费者消费到了生产者产生的数据

filebeat部署

安装

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

编辑 vim/etc/yum.repos.d/fb.repo

[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
​type=rpm-md

yum安装filebeat

yum  install  filebeat -y

设置开机自启

systemctl enable filebeat

修改filebeat配置文件,filebeat的配置文件是yml格式的,

首先将filebeat的配置文件filebeat.yml备份一份为filebeat.yml.bak

[root@nginx-kafka01 filebeat]# cp filebeat.yml filebeat.yml.bak

然后再将filebeat.yml文件清空,加上我们自己配置的一些配置

filebeat.inputs:
- type: log
  # Change to true to enable this input configuration.
  enabled: true
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/nginx/sc/access.log
#==========------------------------------kafka-----------------------------------
output.kafka:
  hosts: ["192.168.44.181:9092","192.168.44.182:9092","192.168.44.183:9092"]
  topic: nginxlog
  keep_alive: 10s

配置好了配置文件,就可以通过filebeat来收集nginx的日志了

测试filebeat能否生产数据

创建主题 :nginxlog(这个主题是我们在filebeat指定好的,filebeat会将生产的数据都吐到这个主题里面)

bin/kafka-topics.sh --create --zookeeper 192.168.44.181:2181 --replication-factor 3 --partitions 1 --topic nginxlog

启动filebeat服务

systemctl start  filebeat

可以看到,filebeat进程是启动成功了的

 接下来用kafka自带的消费者程序来测试一下我们能否消费到filebeat生产的nginxlog主题里面的数据

[root@nginx-kafka01 kafka_2.12-2.8.1]# bin/kafka-topics.sh --create --zookeeper 192.168.44.181:2181 --replication-factor 3 --partitions 1 --topic nginxlog

消费成功效果图:

 这个时候,我们可以刷新一下我们nginx的静态页面,没有问题的话,我们消费的数据会10秒刷新一次。

 效果图:

 

 可以到filebeat的记录数据的文件里面看一下消费者有没有成功消费到数据。(/var/lib/filebeat/registry/filebeat/)

[root@nginx-kafka01 filebeat]# less log.json 

 编写python脚本,模拟消费者消费数据,然后将所需字段提取出来整理后放入数据库里面

[root@nginx-kafka01 opt]# cat python_consumer.py 
import json
import requests
import time

taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
#查询ip地址的信息(省份和运营商isp),通过taobao网的接口
def resolv_ip(ip):
    response = requests.get(taobao_url+ip)
    if response.status_code == 200:
       tmp_dict = json.loads(response.text)
       prov = tmp_dict["data"]["region"]
       isp = tmp_dict["data"]["isp"]
       return prov,isp
    return None,None

#将日志里读取的格式转换为我们指定的格式
def trans_time(dt):
     #把字符串转成时间格式
    timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
    #timeStamp = int(time.mktime(timeArray))
    #把时间格式转成字符串
    new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)    
    return new_time

#从kafka里获取数据,清洗为我们需要的ip,时间,带宽
from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.44.181:9092,192.168.44.182:9092,192.168.44.183:9092")
topic = client.topics['nginxlog'] 
balanced_consumer = topic.get_balanced_consumer(
  consumer_group='testgroup',
  auto_commit_enable=True,    
  zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
) 
consumer = topic.get_simple_consumer() 
for message in balanced_consumer:
   if message is not None: 
       line = json.loads(message.value.decode("utf-8"))
       log = line["message"]
       tmp_lst = log.split()
       ip = tmp_lst[0]
       dt = tmp_lst[3].replace("[","")
       bt = tmp_lst[9]
       dt = trans_time(dt)
       prov, isp = resolv_ip(ip)
       if prov and isp:
          print(prov, isp,dt)

效果图:

 

日志收集平台详细架构图:

附带一些原理

 

 

以上是关于基于kafka的日志收集分析平台的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper + kafka + OpenRestry + Lua + Apache Druid实现日志收集与分析

猎聘网架构中间件负责人:基于Flume+Kafka+ Elasticsearch+Storm的海量日志实时分析平台

知乎基于Kubernetes的kafka平台的设计和实现

基于Flume+Kafka+ Elasticsearch+Storm的海量日志实时分析平台

搭建Graylog2集群(基于ElasticSearch的日志收集分析平台)

结合Docker快速搭建ELK日志收集分析平台