ELK 日志分析系统整合 KafKa Zookeeper 集群

Posted 开源搬运工宋师傅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK 日志分析系统整合 KafKa Zookeeper 集群相关的知识,希望对你有一定的参考价值。

集群主机环境

Hostname

Server IP

Software Version

Role

elk-node1

192.168.99.185

elasticsearch-6.8.4-1.noarch

logstash-6.8.4-1.noarch

kibana-6.8.4-1.x86_64

openjdk version "1.8.0_242"

es master data node

kibana web

logstash

elk-node2

192.168.99.186

elasticsearch-6.8.4-1.noarch

logstash-6.8.4-1.noarch

openjdk version "1.8.0_242"

es data node

logstash

kafka-node1

192.168.99.233

kafka_2.12-2.5.0

zookeeper-3.5.7

openjdk version "1.8.0_242"

kafka/zookeeper

kafka-node2

192.168.99.232

kafka_2.12-2.5.0

zookeeper-3.5.7

openjdk version "1.8.0_242"

kafka/zookeeper

kafka-node3

192.168.99.221

kafka_2.12-2.5.0

zookeeper-3.5.7

openjdk version "1.8.0_242"

kafka/zookeeper

zabbix-server

192.168.99.50

filebeat-6.8.4-1.x86_64

filebeat

日志采集分析系统架构

ELK集群配置

kafka集群配置

    注意:zookeeper版本从3.5.5开始带有"bin.tar.gz"名称的软件包是直接可以使用的编译好的二进制包,之前的"tar.gz"的软件包是只有源码的包,无法直接使用。使用"apache-zookeeper-3.5.7.tar.gz"软件包会出现使用zkServer.sh start启动 zookeeper服务失败,客户端连接也会出错!!!正确的安装包为"apache-zookeeper-3.5.7.tar.gz"软件包。

网络设备日志服务器配置

     Rsyslog 网络日志服务器配置请参考公众号ELK专栏《》的文章。


Filebeat config

      filebeat作为kafka生产消息者,在filebeat 主机中日志分为网络设备日志和系统日志,对不同的网络设备日志和linux 系统的不同种类的日志使用tags标签的方式进行区分,以便于在logstash中使用tags进行匹配进行不同方式的字段清洗。同时分别使用不同的log_topic输出到kafka集群中,其中网络设备日志的log_topic=network,linux系统的log_topic=linuxos。

egrep -v "*#|^$" /etc/filebeat/filebeat.yml filebeat.inputs:- type: log enabled: true paths: - /mnt/huawei/* fields: log_topic: network tags: ["huawei"] include_lines: ['Failed','failed','error','ERROR','DOWN','down','UP','up']- type: log paths: - /mnt/h3c/* fields: log_topic: network tags: ["h3c"] include_lines: ['Failed','failed','error','ERROR','DOWN','down','UP','up']- type: log paths: - /mnt/ruijie/* fields: log_topic: network tags: ["ruijie"] include_lines: ['Failed','failed','error','ERROR','DOWN','down','UP','up']- type: log enabled: true tags: ["secure"] paths: - /var/log/secure fields: log_topic: linuxos include_lines: [".*Failed.*",".*Accepted.*"]- type: log enabled: true paths: - /var/log/messages fields: log_topic: linuxos tags: ["messages"] include_lines: ['Failed','error','ERROR'] filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false name: 192.168.99.185setup.template.settings: index.number_of_shards: 3setup.kibana:output.kafka: enabled: true  hosts: ["192.168.99.233:9092","192.168.99.223:9092","192.168.99.221:9092"] topic: '%{[fields][log_topic]}'  partition.round_robin: reachable_only: true worker: 2 required_acks: 1 compression: gzip max_message_bytes: 10000000processors: - drop_fields: fields: ["beat", "input","host","log","source","name","os"] - add_host_metadata: ~ - add_cloud_metadata: ~

       注意:filebeat output kafka集群可能会出现连接kafka失败的错误,请观察filebeat日志日志路径为/var/log//filebeat/filebeat。使用"tail -f /var/log//filebeat/filebeat"命令查看。


logstash config

      两台logstash分别作为kafka集群的消费消息者,192.168.99.185主机负责网络设备日志的清洗,192.168.99.186主机负责linux系统日志的清洗,当然网络设备日志的清洗和linux系统日志的清洗可以运行在一台logstash上。以下两台logstash 的配置包含了output至Zabbix 告警的部分,如不需要对接Zabbix告警平台可删掉对接Zabbix部分的配置。

      logstash日志路径"/var/log/logstash/logstash-plain.log",使用"tail -f  /var/log/logstash/logstash-plain.log"命令查看。

network logstash

[root@elk-node1 conf.d]# cat network.conf input { kafka { codec => "json" topics => ["network"] bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"] group_id => "logstash" } }
filter { if "huawei" in [tags] { grok{ match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"} } }
else if "h3c" in [tags] { grok{ match => {"message" => "%{SYSLOGTIMESTAMP:time} %{YEAR:year} %{DATA:hostname} %{GREEDYDATA:info}"} } } else if "ruijie" in [tags] { grok{ match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"} } }mutate { add_field => [ "[zabbix_key]", "networklogs" ] add_field => [ "[zabbix_host]", "192.168.99.185" ] add_field => [ "count","%{hostname}:%{info}" ] remove_field => ["message","time","year","offset","tags","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"] }}
output{stdout{codec => rubydebug}elasticsearch{ index => "networklogs-%{+YYYY.MM.dd}" hosts => ["192.168.99.185:9200"] user => "elastic" password => "qZXo7E" sniffing => false }if [count] =~ /(ERR|error|ERROR|Failed|failed)/ { zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.99.200" zabbix_server_port => "10051" zabbix_value => "count" } }}

linuxos logstash

[root@elk-node2 ~]# cat /etc/logstash/conf.d/system.conf input { kafka { codec => "json" topics => ["linuxos"] bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"] } }
filter { if "secure" in [tags] { grok { match => { "message" => "%{SYSLOGTIMESTAMP:time} %{DATA:host1} .*?: %{DATA:status} .*? for %{USER:user} from %{IP:clients} port %{NUMBER:port} .*?" } }mutate { add_field => [ "[zabbix_key]", "securelogs" ] add_field => [ "[zabbix_host]", "192.168.99.186" ] add_field => [ "count1","%{host1}--%{message}" ] } } else if "messages" in [tags] { grok { match => { "message" => "%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host1} %{DATA:syslog_prom} .*?" } } }mutate { remove_field => ["time","offset","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"] } }
output{stdout{codec => rubydebug} if "secure" in [tags]{elasticsearch{ index => "secure-%{+YYYY.MM.dd}" hosts => ["192.168.99.186:9200"] user => "elastic" password => "qZXo7E" } } if "messages" in [tags]{elasticsearch{ index => "messages-%{+YYYY.MM.dd}" hosts => ["192.168.99.186:9200"] user => "elastic" password => "qZXo7E" } }if [count1] =~ /(Failed|Accepted)/ {zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.99.200" zabbix_server_port => "10051" zabbix_value => "count1" } }}

KafKa 集群验证测试

查看filebeat 生成的topic

[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-topics.sh --list --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181__consumer_offsetslinuxosnetwork

验证消费者消费消息

#network 主题消费[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092 --topic network --from-beginning#linuxos 主题消费[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092 --topic linuxos --from-beginning


ELK 日志分析系统整合 KafKa Zookeeper 集群

ELK 日志分析系统整合 KafKa Zookeeper 集群

查看logstash清洗字段输出

tail -f /var/log/messages

ELK 日志分析系统整合 KafKa Zookeeper 集群

Kibana Web UI

user authentication


ELK 日志分析系统整合 KafKa Zookeeper 集群

Discover networklogs index

ELK 日志分析系统整合 KafKa Zookeeper 集群

Discover secure index

ELK 日志分析系统整合 KafKa Zookeeper 集群

Discover messages index

网络设备日志仪表盘


以上是关于ELK 日志分析系统整合 KafKa Zookeeper 集群的主要内容,如果未能解决你的问题,请参考以下文章

EFLFK——ELK日志分析系统+kafka+filebeat架构

企业日志大数据分析系统ELK+KAFKA实现

企业日志大数据分析系统ELK+KAFKA实现

离线部署ELK+kafka日志管理系统

离线部署ELK+kafka日志管理系统

zabbix整合ELK收集系统异常日志触发告警