ELK 日志分析系统整合 KafKa Zookeeper 集群
Posted 开源搬运工宋师傅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK 日志分析系统整合 KafKa Zookeeper 集群相关的知识,希望对你有一定的参考价值。
集群主机环境
Hostname |
Server IP |
Software Version |
Role |
elk-node1 |
192.168.99.185 |
elasticsearch-6.8.4-1.noarch logstash-6.8.4-1.noarch kibana-6.8.4-1.x86_64 openjdk version "1.8.0_242" |
es master data node kibana web logstash |
elk-node2 |
192.168.99.186 |
elasticsearch-6.8.4-1.noarch logstash-6.8.4-1.noarch openjdk version "1.8.0_242" |
es data node logstash |
kafka-node1 |
192.168.99.233 |
kafka_2.12-2.5.0 zookeeper-3.5.7 openjdk version "1.8.0_242" |
kafka/zookeeper |
kafka-node2 |
192.168.99.232 |
kafka_2.12-2.5.0 zookeeper-3.5.7 openjdk version "1.8.0_242" |
kafka/zookeeper |
kafka-node3 |
192.168.99.221 |
kafka_2.12-2.5.0 zookeeper-3.5.7 openjdk version "1.8.0_242" |
kafka/zookeeper |
zabbix-server |
192.168.99.50 |
filebeat-6.8.4-1.x86_64 |
filebeat |
日志采集分析系统架构
ELK集群配置
kafka集群配置
注意:zookeeper版本从3.5.5开始带有"bin.tar.gz"名称的软件包是直接可以使用的编译好的二进制包,之前的"tar.gz"的软件包是只有源码的包,无法直接使用。使用"apache-zookeeper-3.5.7.tar.gz"软件包会出现使用zkServer.sh start启动 zookeeper服务失败,客户端连接也会出错!!!正确的安装包为"apache-zookeeper-3.5.7.tar.gz"软件包。
网络设备日志服务器配置
Rsyslog 网络日志服务器配置请参考公众号ELK专栏《》的文章。
Filebeat config
filebeat作为kafka生产消息者,在filebeat 主机中日志分为网络设备日志和系统日志,对不同的网络设备日志和linux 系统的不同种类的日志使用tags标签的方式进行区分,以便于在logstash中使用tags进行匹配进行不同方式的字段清洗。同时分别使用不同的log_topic输出到kafka集群中,其中网络设备日志的log_topic=network,linux系统的log_topic=linuxos。
egrep -v "*#|^$" /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /mnt/huawei/*
fields:
log_topic: network
tags: ["huawei"]
include_lines: ['Failed','failed','error','ERROR','DOWN','down','UP','up']
- type: log
paths:
- /mnt/h3c/*
fields:
log_topic: network
tags: ["h3c"]
include_lines: ['Failed','failed','error','ERROR','DOWN','down','UP','up']
- type: log
paths:
- /mnt/ruijie/*
fields:
log_topic: network
tags: ["ruijie"]
include_lines: ['Failed','failed','error','ERROR','DOWN','down','UP','up']
- type: log
enabled: true
tags: ["secure"]
paths:
- /var/log/secure
fields:
log_topic: linuxos
include_lines: [".*Failed.*",".*Accepted.*"]
- type: log
enabled: true
paths:
- /var/log/messages
fields:
log_topic: linuxos
tags: ["messages"]
include_lines: ['Failed','error','ERROR']
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
name: 192.168.99.185
setup.template.settings:
index.number_of_shards: 3
setup.kibana:
output.kafka:
enabled: true
hosts: ["192.168.99.233:9092","192.168.99.223:9092","192.168.99.221:9092"]
topic: '%{[fields][log_topic]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000
processors:
- drop_fields:
fields: ["beat", "input","host","log","source","name","os"]
- add_host_metadata: ~
- add_cloud_metadata: ~
注意:filebeat output kafka集群可能会出现连接kafka失败的错误,请观察filebeat日志日志路径为/var/log//filebeat/filebeat。使用"tail -f /var/log//filebeat/filebeat"命令查看。
logstash config
两台logstash分别作为kafka集群的消费消息者,192.168.99.185主机负责网络设备日志的清洗,192.168.99.186主机负责linux系统日志的清洗,当然网络设备日志的清洗和linux系统日志的清洗可以运行在一台logstash上。以下两台logstash 的配置包含了output至Zabbix 告警的部分,如不需要对接Zabbix告警平台可删掉对接Zabbix部分的配置。
logstash日志路径"/var/log/logstash/logstash-plain.log",使用"tail -f /var/log/logstash/logstash-plain.log"命令查看。
network logstash
[root@elk-node1 conf.d]# cat network.conf
input {
kafka {
codec => "json"
topics => ["network"]
bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]
group_id => "logstash"
}
}
filter {
if "huawei" in [tags] {
grok{
match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"}
}
}
else if "h3c" in [tags] {
grok{
match => {"message" => "%{SYSLOGTIMESTAMP:time} %{YEAR:year} %{DATA:hostname} %{GREEDYDATA:info}"}
}
}
else if "ruijie" in [tags] {
grok{
match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"}
}
}
mutate {
add_field => [ "[zabbix_key]", "networklogs" ]
add_field => [ "[zabbix_host]", "192.168.99.185" ]
add_field => [ "count","%{hostname}:%{info}" ]
remove_field => ["message","time","year","offset","tags","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"]
}
}
output{
stdout{codec => rubydebug}
elasticsearch{
index => "networklogs-%{+YYYY.MM.dd}"
hosts => ["192.168.99.185:9200"]
user => "elastic"
password => "qZXo7E"
sniffing => false
}
if [count] =~ /(ERR|error|ERROR|Failed|failed)/ {
zabbix {
zabbix_host => "[zabbix_host]"
zabbix_key => "[zabbix_key]"
zabbix_server_host => "192.168.99.200"
zabbix_server_port => "10051"
zabbix_value => "count"
}
}
}
linuxos logstash
[root@elk-node2 ~]# cat /etc/logstash/conf.d/system.conf
input {
kafka {
codec => "json"
topics => ["linuxos"]
bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]
}
}
filter {
if "secure" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:host1} .*?: %{DATA:status} .*? for %{USER:user} from %{IP:clients} port %{NUMBER:port} .*?"
}
}
mutate {
add_field => [ "[zabbix_key]", "securelogs" ]
add_field => [ "[zabbix_host]", "192.168.99.186" ]
add_field => [ "count1","%{host1}--%{message}" ]
}
}
else if "messages" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host1} %{DATA:syslog_prom} .*?"
}
}
}
mutate {
remove_field => ["time","offset","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"]
}
}
output{
stdout{codec => rubydebug}
if "secure" in [tags]{
elasticsearch{
index => "secure-%{+YYYY.MM.dd}"
hosts => ["192.168.99.186:9200"]
user => "elastic"
password => "qZXo7E"
}
}
if "messages" in [tags]{
elasticsearch{
index => "messages-%{+YYYY.MM.dd}"
hosts => ["192.168.99.186:9200"]
user => "elastic"
password => "qZXo7E"
}
}
if [count1] =~ /(Failed|Accepted)/ {
zabbix {
zabbix_host => "[zabbix_host]"
zabbix_key => "[zabbix_key]"
zabbix_server_host => "192.168.99.200"
zabbix_server_port => "10051"
zabbix_value => "count1"
}
}
}
KafKa 集群验证测试
查看filebeat 生成的topic
[ ]
__consumer_offsets
linuxos
network
验证消费者消费消息
[ ]
[ ]
查看logstash清洗字段输出
tail -f /var/log/messages
Kibana Web UI
user authentication
Discover networklogs index
Discover secure index
Discover messages index
网络设备日志仪表盘
以上是关于ELK 日志分析系统整合 KafKa Zookeeper 集群的主要内容,如果未能解决你的问题,请参考以下文章