搭建办公环境ElasticSearch 日志分析系统
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了搭建办公环境ElasticSearch 日志分析系统相关的知识,希望对你有一定的参考价值。
搭建办公环境ElasticSearch 日志分析系统? 计划将公司的防火墙+交换机+服务器(centos7)+ Vmware+Windows server纳入到监控范围,所以开启了ELK监控之旅。
? 本文采用ELK架构栈进行组建,万丈高楼平地起,虽然开始比较简陋,后期会不断完善这个日志分析系统。
? 全文框架如下:
? Hillstone: syslog→logstash→elasticsearch→kibana
? H3C: syslog→logstash→elasticsearch→kibana
? ESXI: syslog→logstash→elasticsearch→kibana
? Vcenter: syslog→logstash→elasticsearch→kibana
? Windows server: winlogbeat→logstash→elasticsearch→kibana
? linux server: filebeate→lasticsearch→kibana
? ELK说明:
? ELK1: 192.168.20.18:9200
? ELK2: 192.168.20.19:9200
? 规划:
? Logstash: 192.168.20.18
? 不同服务根据端口不同进行标记,创建不同的索引。
?
1 Hillstone部分
1.1 Hillstone配置
1.1.1 配置步骤
? 本文通过web界面配置,当然也能进行命令行配置,具体配置请参考链接。
? 找到Stoneos-日志管理-log配置-日志管理器,配置服务器日志:
? 主机名: 192.168.20.18
? 绑定方式: 虚拟路由器 trust-vr
? 协议: UDP
? 端口: 514
? //我使用的root运行,非root账号使用端口在1024以上。
?
1.1.2 参考链接
1.2 添加logstash 配置文件
1.2.1 创建配置测试文件
cat > /data/config/test-hillstone.config << EOF
input{
udp {port => 518 type => "Hillstone"}
}
output {
stdout { codec=> rubydebug }
}
EOF
logstash -f test-hillstone.config
1.2.2 选取其中一条日志进行分析调试。
<190>Nov 29 17:24:52 1404726150004842(root) 44243624 Traffic@FLOW: SESSION: 10.6.2.43:49608->192.168.20.160:11800(TCP), application TCP-ANY, interface tunnel6, vr trust-vr, policy 1, user -@-, host -, send packets 1,send bytes 74,receive packets 1,receive bytes 110,start time 2019-11-29 17:24:50,close time 2019-11-29 17:24:52,session end,TCP RST
u0000
1.2.3Logstash分析说明
可以通过grok debug网站进行自动匹配(https://grokdebug.herokuapp.com/discover?#),再根据分析出来的日志,进行二次调整。
同样晚上有很多案例进行参考,可以先去参考别人想法,再补充自己的想法。
关于grok部分详细讲解,请参考https://coding.imooc.com/class/181.html,老师讲的很棒,当然吾爱破解论坛和B站,有免费版。
1.2.4 Logstash综合配置
? 只选取了会话+NAT部分
cat > /data/config/hillstone.config<< EOF
input{
udp {
port => 518
type => "hillstone"
}
}
filter {
grok {
##流量日志
#SESSION会话结束日志
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{BASE10NUM:serial}(%{WORD:ROOT}) %{DATA:logid} %{DATA:Sort}@%{DATA:Class}: %{DATA:module}: %{IPV4:srcip}:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}(%{DATA:protocol}), application %{USER:app}, interface %{DATA:interface}, vr %{USER:vr}, policy %{DATA:policy}, user %{USERNAME:user}@%{DATA:AAAserver}, host %{USER:HOST}, send packets %{BASE10NUM:sendPackets},send bytes %{BASE10NUM:sendBytes},receive packets %{BASE10NUM:receivePackets},receive bytes %{BASE10NUM:receiveBytes},start time %{TIMESTAMP_ISO8601:startTime},close time %{TIMESTAMP_ISO8601:closeTime},session %{WORD:state},%{GREEDYDATA:reason}"}
#SESSION会话开始日志
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{BASE10NUM:serial}(%{WORD:ROOT}) %{DATA:logid} %{DATA:Sort}@%{DATA:Class}: %{DATA:module}: %{IPV4:srcip}:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}(%{DATA:protocol}), interface %{DATA:interface}, vr %{DATA:vr}, policy %{DATA:policy}, user %{USERNAME:user}@%{DATA:AAAserver}, host %{USER:HOST}, session %{WORD:state}%{GREEDYDATA:reason}"}
#SNAT日志
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{BASE10NUM:serial}(%{WORD:ROOT}) %{DATA:logid} %{DATA:Sort}@%{DATA:Class}: %{DATA:module}: %{IPV4:srcip}:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}(%{DATA:protocol}), %{WORD:state} to %{IPV4:snatip}:%{BASE10NUM:snatport}, vr %{DATA:vr}, user %{USERNAME:user}@%{DATA:AAAserver}, host %{DATA:HOST}, rule %{BASE10NUM:rule}"}
#DNAT日志
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{BASE10NUM:serial}(%{WORD:ROOT}) %{DATA:logid} %{DATA:Sort}@%{DATA:Class}: %{DATA:module}: %{IPV4:srcip}:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}(%{DATA:protocol}), %{WORD:state} to %{IPV4:dnatip}:%{BASE10NUM:dnatport}, vr %{DATA:vr}, user %{USERNAME:user}@%{DATA:AAAserver}, host %{DATA:HOST}, rule %{BASE10NUM:rule}"}
}
mutate {
lowercase => [ "module" ]
remove_field => ["host", "message", "ROOT", "HOST", "serial", "syslog_pri", "timestamp", "mac", "AAAserver", "user"]
}
}
output {
elasticsearch {
hosts => "192.168.20.18:9200" #elasticsearch服务地址
index => "logstash-hillstone-%{module}-%{state}-%{+YYYY.MM.dd}"
}
}
EOF
3.1.2.3 参考文件
2 H3C交换机部分
2.1 H3C Syslog转发
2.1.1 H3C交换机配置
? 本文通过命令行进行配置,具体配置请参考链接。
-
? 将交换机的时间设置正确
clock datetime hh:mm:ss year/month/day save force
-
设置交换机syslog转发。
system-view info-center enable // 开启info-center info-center loghost 192.168.20.18 port 516 facility local8 // 设置日志主机/端口/日志级别 info-center source default loghost level informational //设置日志级别 save force
2.1.2 参考链接
? H3C设置时间
2.2 添加logstash 配置文件
2.2.1 创建配置测试文件
cat > /data/config/test-h3c.congfig << EOF
input{
udp {port => 516 type => "h3c"}
}
output {
stdout { codec=> rubydebug }
}
EOF
2.2.2 选取一条日志进行分析调试。
<190>Nov 30 16:27:23 1404726150004842(root) 44243622 Traffic@FLOW: SESSION: 10.6.4.178:48150->192.168.20.161:11800(TCP), interface tunnel6, vr trust-vr, policy 1, user -@-, host -, session start
u0000
2.2.3 mutate基本用法说明
? 参考链接: https://blog.csdn.net/qq_34624315/article/details/83013531
2.2.4 综合配置。
cat > H3C.conf <<EOF
###h3c 日志过滤
grok {
match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:year} %{DATA:hostname} \%\%%{DATA:ddModuleName}/%{POSINT:severity}/%{DATA:brief}: %{GREEDYDATA:reason}" }
add_field => {"severity_code" => "%{severity}"}
}
mutate {
gsub => [
"severity", "0", "Emergency",
"severity", "1", "Alert",
"severity", "2", "Critical",
"severity", "3", "Error",
"severity", "4", "Warning",
"severity", "5", "Notice",
"severity", "6", "Informational",
"severity", "7", "Debug"
]
remove_field => ["message", "syslog_pri"]
}
}
output {
stdout { codec=> rubydebug }
# elasticsearch {
# hosts => "192.168.20.18:9200" #elasticsearch服务地址
# index => "logstash-h3c-%{+YYYY.MM.dd}"
# }
}
EOF
2.2.5 参考文件
3 Vmware部分
3.1 Esxi日志收集部分
? 主要是收集ESXI机器日志,方便进行安全日志分析;
? 主要通过syslog进行日志收集,再通过ELK栈提供的logstash进行分析
? ESXI-syslog--logstash--elasticsearch
3.1.1 Esxi机器syslog服务器配置
3.1.1.1 配置步骤
? 本文通过客户端配置,当然也能进行web配置,方法基本一致,具体配置请参考链接。
-
? 开启syslog服务:
打开esxi客户端-选择主机-主机配置-高级设置-syslog-设置远程syslog服务器为: udp://192.168.20.18:514
-
允许防火墙放行。
打开esxi客户端-选择主机-主机配置-安全配置文件-防火墙-编辑-勾选syslog服务器,点击确定。
3.1.1.2 参考链接
Monitoring VMWare ESXi with the ELK Stack
3.1.2 添加logstash 配置文件
3.1.2.1 创建配置测试文件
cat > /data/config/test-vmware.config << EOF input{ udp {port => 514 type => "Hillstone"} } output { stdout { codec=> rubydebug } } EOF logstash -f test-vmware.config
3.1.2.2 选取其中一条日志进行分析调试。
<167>2019-12-03T07:36:11.689Z localhost.localdomain Vpxa: verbose vpxa[644C8B70] [Originator@6876 sub=VpxaHalCnxHostagent opID=WFU-2d14bc3d] [WaitForUpdatesDone] Completed callback
3.1.2.3 结合网上的综合案例对测试文件进行配置改造。
cat > vmware.conf <<EOF input{ udp { port => 514 type => "vmware" } } filter { if "vmware" in [type] { grok { break_on_match => true match => [ "message", "<%{POSINT:syslog_pri}>%{TIMESTAMP_ISO8601:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{SYSLOGPROG:syslog_program}: (?<message-body>(?<message_system_info>(?:[%{DATA:message_thread_id} %{DATA:syslog_level} ‘%{DATA:message_service}‘ ?%{DATA:message_opID}])) [%{DATA:message_service_info}] (?<syslog_message>(%{GREEDYDATA})))", "message", "<%{POSINT:syslog_pri}>%{TIMESTAMP_ISO8601:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{SYSLOGPROG:syslog_program}: (?<message-body>(?<message_system_info>(?:[%{DATA:message_thread_id} %{DATA:syslog_level} ‘%{DATA:message_service}‘ ?%{DATA:message_opID}])) (?<syslog_message>(%{GREEDYDATA})))", "message", "<%{POSINT:syslog_pri}>%{TIMESTAMP_ISO8601:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{SYSLOGPROG:syslog_program}: %{GREEDYDATA:syslog_message}" ] } date { match => [ "syslog_timestamp", "YYYY-MM-ddHH:mm:ss", "ISO8601" ] } mutate { replace => [ "@source_host", "%{syslog_hostname}" ] } mutate { replace => [ "@message", "%{syslog_message}" ] } mutate { remove_field => ["@source_host","program","@timestamp","syslog_hostname","@message"] } if "Device naa" in [message] { grok { break_on_match => false match => [ "message", "Device naa.%{WORD:device_naa} performance has %{WORD:device_status}%{GREEDYDATA} of %{INT:datastore_latency_from}%{GREEDYDATA} to %{INT:datastore_latency_to}", "message", "Device naa.%{WORD:device_naa} performance has %{WORD:device_status}%{GREEDYDATA} from %{INT:datastore_latency_from}%{GREEDYDATA} to %{INT:datastore_latency_to}" ] } } if "connectivity issues" in [message] { grok { match => [ "message", "Hostd: %{GREEDYDATA} : %{DATA:device_access} to volume %{DATA:device_id} %{DATA:datastore} (following|due to)" ] } } if "WARNING" in [message] { grok { match => [ "message", "WARNING: %{GREEDYDATA:vmware_warning_msg}" ] } } } } output { elasticsearch { hosts => "192.168.20.18:9200" #elasticsearch服务地址 index => "logstash-vmware-%{+YYYY.MM.dd}" } # stdout { codec=> rubydebug } } EOF
3.1.2.3 参考文件
3.2 Vcenter日志收集部分
3.2.1 Vcenter-syslog服务器配置
? 主要是收集vcsa机器日志,方便进行安全日志分析;
? 主要通过syslog进行日志收集,再通过ELK栈提供的logstash进行分析
? VCSA-Syslog--Logstash--Elasticsearch
3.2.1.1 配置步骤
? 打开VCSA的管理后台URL: http://192.168.20.90:5480,输入账号和密码(开机root和密码)--点击syslog配置中心,输入syslog配置信息。
3.2.1.2 参考链接
VCSA 6.5 forward to multiple syslog
3.2.2 添加logstash 配置文件
3.2.2.1 创建测试配置
input{ udp { port => 1514 type => "vcenter" } } output { stdout { codec=> rubydebug } }
3.2.2.2 选取一条日志进行分析调试
<14>1 2019-12-05T02:44:17.640474+00:00 photon-machine vpxd 4035 - - Event [4184629] [1-1] [2019-12-05T02:44:00.017928Z] [vim.event.UserLoginSessionEvent] [info] [root] [Datacenter] [4184629] [User root@192.168.20.17 logged in as pyvmomi Python/3.6.8 (Linux; 3.10.0-957.el7.x86_64; x86_64)]
3.2.2.3 结合网上的综合案例对配置文件进行配置改造。
cat > vcenter.conf <<EOF input{ udp { port => 1514 type => "vcenter" } } filter { if "vcenter" in [type] { } grok { break_on_match => true match => [ "message", "<%{NONNEGINT:syslog_pri}>%{NONNEGINT:syslog_ver} +(?:%{TIMESTAMP_ISO8601:syslog_timestamp}|-) +(?:%{HOSTNAME:syslog_hostname}|-) +(-|%{SYSLOG5424PRINTASCII:syslog_program}) +(-|%{SYSLOG5424PRINTASCII:syslog_proc}) +(-|%{SYSLOG5424PRINTASCII:syslog_msgid}) +(?:%{SYSLOG5424SD:syslog_sd}|-|) +%{GREEDYDATA:syslog_msg}" ] } date { match => [ "syslog_timestamp", "YYYY-MM-ddHH:mm:ss,SSS", "YYYY-MM-dd HH:mm:ss,SSS", "ISO8601" ] #timezone => "UTC" #For vCenter Appliance #timezone => "Asia/Shanghai" } mutate { remove_field => ["syslog_ver", "syslog_pri"] } } output { elasticsearch { hosts => "192.168.20.18:9200" #elasticsearch服务地址 index => "logstash-vcenter-%{+YYYY.MM.dd}" } # stdout { codec=> rubydebug } } EOF
3.2.2.4 参考文件
4 Windows server部分
4.1 windows server日志收集部分
? 主要是收集AD域日志,方便进行安全日志分析;
? 主要通过ELK栈提供的winlogbeat进行收集
? winlogbeat--logstash--elasticsearch
4.1.1 windows服务器配置
-
下载安装
下载连接地址:https://www.elastic.co/cn/downloads/beats/winlogbeat
-
配置:
将解压后的文件放到“C:Program Files”,重命名为winlogbeat
-
安装 :
命令安装
-
编辑:
编辑winlogbeat.yml文件
winlogbeat.event_logs: - name: Application - name: Security - name: System output.logstash: enbaled: true hosts: ["192.168.20.18:5044"] logging.to_files: true logging.files: path: D:ProgramDatawinlogbeatLogs logging.level: info
-
测试配置文件
PS C:Program FilesWinlogbeat> .winlogbeat.exe test config -c .winlogbeat.yml -e
-
启动:
启动winlogbeat
powershell命令行启动:
PS C:Program FilesWinlogbeat> Start-Service winlogbeat
powershell命令行关闭
PS C:Program FilesWinlogbeat> Stop-Service winlogbeat
-
导入模板
导入winlogindex模板,因为我们使用的logstash,所以需要手动导入。
PS > .winlogbeat.exe setup --index-management -E output.logstash.enabled=false -E ‘output.elasticsearch.hosts=["192.168.20.18"]‘
-
导入dashboard
导入kibana-dashboard,因为我们使用的logstash,所以需要手动导入。
PS > .winlogbeat.exe setup -e -E output.logstash.enabled=false -E output.elasticsearch.hosts=[‘192.168.20.18:9200‘] -E setup.kibana.host=192.168.20.18:5601
4.1.2 参考链接
? 官方手册分析
4.2 添加logstash 配置文件
4.2.1 创建测试配置文件
cat > /data/config/test-windows.config << EOF input { beats { port => 5044 } } output { stdout { codec=> rubydebug } } EOF logstash -f test-windows.config
4.2.2 创建配置文件
创建正式配置文件,查看内容(因为已有模板,所以不错其他修改处理)
cat > /data/config/windows.config << EOF input { beats { port => 5044 } } output { elasticsearch { hosts => ["http:192.168.20.18:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}" } } EOF logstash -f windows.config
4.2.3 参考文件
4.3查看Discover
? 因为已经winlogbeat是elk栈的标准模块,已经被定义,所以我们不再自行定义。
? 直接打开搜索winlogbaet*的index。
4.4 查看dashboard
? 因为已经winlogbeat是elk栈的标准模块,已经被定义,所以我们不再自行定义。
? 直接打开搜索winlogbaet*的dashboard
5 Linux server部分
5.1 linux 系统日志收集部分
? 主要是收集linux上的开关机日志,安全日志;
? 主要通过ELK栈提供的filebeat进行收集
? filebeat-filebeat-module--elasticsearch
?
5.1.1 安装部署
-
下载
官网下载,连接地址:https://www.elastic.co/cn/downloads/beats/filebeat
-
安装 :
命令行安装
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.5.0-linux-x86_64.tar.gz tar xzvf filebeat-7.5.0-linux-x86_64.tar.gz
-
查看布局
查看filebeat目录布局
-
编辑:
编辑filebeat.yml文件
filebeat.inputs: - type: log enabled: true paths: - /var/log/*.log #- c:programdataelasticsearchlogs* output.elasticsearch: hosts: ["192.168.20.18:9200"] setup.kibana: host: "192.168.20.18:5601"
-
启动:
运行filebeat文件
filebeat -c filebeat.yml -e
5.1.2 参考链接
5.2 修改filebeat配置文件
5.2.1索引名称
关闭ilm声明周期管理
setup.ilm.enabled: false
更改索引名称
setup.template.overwrite: true output.elasticsearch.index: "systemlog-7.3.0-%{+yyyy.MM.dd}" setup.template.name: "systemlog" setup.template.pattern: "systemlog-*"
修改预先构建的kibana仪表盘
setup.dashboards.index: "systemlog-*"
5.2.1 开启system模块
./filebeat modules enable system ./filebeat modules list
5.2.3 重新初始化环境
./filebeat setup --template -e -c filebeat.yml
5.2.4 配置模块,修改配置文件
filebeat.modules: - module: system syslog: enabled: true #默认位置/var/log/messages* /var/log/syslog* auth: enabled: true #默认位置/var/log/auth.log* /var/log/secure* output.elasticsearch: hosts: ["192.168.20.18:9200"] setup.kibana: host: "192.168.20.18:5601"
5.2.5 重新启动filebeats,初始化模块
./filebeat setup -e -c filebeat.yml
5.2.6 参考文件
5.3 查看discover
5.4 查看dashboard
6 整合logstash配置
6.1 整合logstash配置文件
input{ udp { port => 516 type => "h3c" } } input{ udp { port => 518 type => "hillstone" } } input{ udp { port => 514 type => "vmware" } } input{ udp { port => 1514 type => "vcenter" } } input { beats { port => 5044 type => "windows" } } filter { if [type] == "hillstone" { grok { match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{BASE10NUM:serial}(%{WORD:ROOT}) %{DATA:logid} %{DATA:Sort}@%{DATA:Class}: %{DATA:module}: %{IPV4:srcip}:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}(%{DATA:protocol}), application %{USER:app}, interface %{DATA:interface}, vr %{USER:vr}, policy %{DATA:policy}, user %{USERNAME:user}@%{DATA:AAAserver}, host %{USER:HOST}, send packets %{BASE10NUM:sendPackets},send bytes %{BASE10NUM:sendBytes},receive packets %{BASE10NUM:receivePackets},receive bytes %{BASE10NUM:receiveBytes},start time %{TIMESTAMP_ISO8601:startTime},close time %{TIMESTAMP_ISO8601:closeTime},session %{WORD:state},%{GREEDYDATA:reason}"} match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{BASE10NUM:serial}(%{WORD:ROOT}) %{DATA:logid} %{DATA:Sort}@%{DATA:Class}: %{DATA:module}: %{IPV4:srcip}:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}(%{DATA:protocol}), interface %{DATA:interface}, vr %{DATA:vr}, policy %{DATA:policy}, user %{USERNAME:user}@%{DATA:AAAserver}, host %{USER:HOST}, session %{WORD:state}%{GREEDYDATA:reason}"} match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{BASE10NUM:serial}(%{WORD:ROOT}) %{DATA:logid} %{DATA:Sort}@%{DATA:Class}: %{DATA:module}: %{IPV4:srcip}:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}(%{DATA:protocol}), %{WORD:state} to %{IPV4:snatip}:%{BASE10NUM:snatport}, vr %{DATA:vr}, user %{USERNAME:user}@%{DATA:AAAserver}, host %{DATA:HOST}, rule %{BASE10NUM:rule}"} match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{BASE10NUM:serial}(%{WORD:ROOT}) %{DATA:logid} %{DATA:Sort}@%{DATA:Class}: %{DATA:module}: %{IPV4:srcip}:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}(%{DATA:protocol}), %{WORD:state} to %{IPV4:dnatip}:%{BASE10NUM:dnatport}, vr %{DATA:vr}, user %{USERNAME:user}@%{DATA:AAAserver}, host %{DATA:HOST}, rule %{BASE10NUM:rule}"} } mutate { lowercase => [ "module" ] remove_field => ["host", "message", "ROOT", "HOST", "serial", "syslog_pri", "timestamp", "mac", "AAAserver", "user"] } } if [type] == "h3c" { grok { match => { "message" => "<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:year} %{DATA:hostname} \%\%%{DATA:ddModuleName}/%{POSINT:severity}/%{DATA:brief}: %{GREEDYDATA:reason}" } add_field => {"severity_code" => "%{severity}"} } mutate { gsub => [ "severity", "0", "Emergency", "severity", "1", "Alert", "severity", "2", "Critical", "severity", "3", "Error", "severity", "4", "Warning", "severity", "5", "Notice", "severity", "6", "Informational", "severity", "7", "Debug" ] remove_field => ["message", "syslog_pri"] } } if [type] == "vmware" { grok { break_on_match => true match => [ "message", "<%{POSINT:syslog_pri}>%{TIMESTAMP_ISO8601:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{SYSLOGPROG:syslog_program}: (?<message-body>(?<message_system_info>(?:[%{DATA:message_thread_id} %{DATA:syslog_level} ‘%{DATA:message_service}‘ ?%{DATA:message_opID}])) [%{DATA:message_service_info}] (?<syslog_message>(%{GREEDYDATA})))", "message", "<%{POSINT:syslog_pri}>%{TIMESTAMP_ISO8601:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{SYSLOGPROG:syslog_program}: (?<message-body>(?<message_system_info>(?:[%{DATA:message_thread_id} %{DATA:syslog_level} ‘%{DATA:message_service}‘ ?%{DATA:message_opID}])) (?<syslog_message>(%{GREEDYDATA})))", "message", "<%{POSINT:syslog_pri}>%{TIMESTAMP_ISO8601:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{SYSLOGPROG:syslog_program}: %{GREEDYDATA:syslog_message}" ] } date { match => [ "syslog_timestamp", "YYYY-MM-ddHH:mm:ss", "ISO8601" ] } mutate { replace => [ "@source_host", "%{syslog_hostname}" ] } mutate { replace => [ "@message", "%{syslog_message}" ] } mutate { remove_field => ["@source_host","program","syslog_hostname","@message"] } } if [type] == "vcenter" { grok { break_on_match => true match => [ "message", "<%{NONNEGINT:syslog_pri}>%{NONNEGINT:syslog_ver} +(?:%{TIMESTAMP_ISO8601:syslog_timestamp}|-) +(?:%{HOSTNAME:syslog_hostname}|-) +(-|%{SYSLOG5424PRINTASCII:syslog_program}) +(-|%{SYSLOG5424PRINTASCII:syslog_proc}) +(-|%{SYSLOG5424PRINTASCII:syslog_msgid}) +(?:%{SYSLOG5424SD:syslog_sd}|-|) +%{GREEDYDATA:syslog_msg}" ] } date { match => [ "syslog_timestamp", "YYYY-MM-ddHH:mm:ss,SSS", "YYYY-MM-dd HH:mm:ss,SSS", "ISO8601" ] } mutate { remove_field => ["syslog_ver", "syslog_pri"] } } } output { if [type] == "hillstone" { elasticsearch { hosts => "192.168.20.18:9200" index => "hillstone-%{module}-%{+YYYY.MM.dd}" } } if [type] == "h3c" { elasticsearch { hosts => "192.168.20.18:9200" index => "h3c-%{+YYYY.MM.dd}" } } if [type] == "vmware" { elasticsearch { hosts => "192.168.20.18:9200" index => "vmware-%{+YYYY.MM.dd}" } } if [type] == "vcenter" { elasticsearch { hosts => "192.168.20.18:9200" index => "vcenter-%{+YYYY.MM.dd}" } } if [type] == "windows" { elasticsearch { hosts => "192.168.20.18:9200" index => "%{[@metadata][beat]}-%{[@metadata][version]}" } } }
6.2 配置logstash服务
启动logstash服务,并把mix.config 更改为logstash.conf ,放到/etc/logstash 目录下。
systemctl enable logstash systemc start logstash
未完待续
推荐查看英文文档方式
找到一篇英文站点,将鼠标移动到url开始,添加icopy.site/回车。
? "icopy.site/"+"https://www.elastic.co"
例如 : 源网址:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
? 转译后网址:https://s0www0elastic0co.icopy.site/guide/en/logstash/current/plugins-filters-grok.html
推荐理由:
? 1.比谷歌全文翻译更准确,而且关键代码不翻译。
? 2.如果你英文不好,或者看英文文档太累,可以试下哦。
-
? 推荐学习视频:
? ELK入门到实践
以上是关于搭建办公环境ElasticSearch 日志分析系统的主要内容,如果未能解决你的问题,请参考以下文章
ELK(elasticsearch+logstash+kibana)开源日志分析平台搭建
资深架构师教你如何使用elk+redis搭建nginx日志分析平台!