1) datasource->logstash->elasticsearch->kibana
2) datasource->filebeat->logstash-> elasticsearch->kibana
3) datasource->filebeat->logstash->redis/kafka->logstash-> elasticsearch->kibana
4) kafka->logstash-> elasticsearch->kibana
5) datasource->filebeat->kafka->logstash->elasticsearch->kibana(最常用)
6) filebeatSSL加密传输
7) datasource->logstash->redis/kafka->logstash->elasticsearch->kibana
8) mysql->logstash->elasticsearch->kibana
1) java环境:jdk8;
2) elk已搭建完毕;
3) elasticsearch、kibana、logstash版本最好保持一致,目前环境是5.6.10版本
4) logstash建议使用root用户(拥有足够权限去搜集所需日志文件);
5) elasticsearch使用普通用户安装,新版本已限制不允许root安装;
6) filebeat安装完毕
7) logstash启动命令:
nohup ./bin/logstash -f ***.conf –config.reload.automatic >/dev/null 2>/dev/null &
8) filebeat启动命令: nohup ./filebeat -e -c filebeat.yml>/dev/null 2>/dev/null &
9)elasticsearch启动命令:./elasticsearch -d
10)kibana启动命令:nohup ./bin/kibana &
一、 简单模式:以logstash作为日志搜索器
特点:这种结构因为需要在各个服务器上部署 Logstash,而它比较消耗 CPU 和内存资源,所以比较适合计算资源丰富的服务器,否则容易造成服务器性能下降,甚至可能导致无法正常工作。
#控制台输入 input { stdin { } } output { #codec输出到控制台 stdout { codec=> rubydebug } #输出到elasticsearch elasticsearch { hosts => "node18:9200" codec => json } #输出到文件 file { path => "/usr/local/logstash-5.6.10/data/log/logstash/all.log" #指定写入文件路径 flush_interval => 0 # 指定刷新间隔,0代表实时写入 codec => json } } |
二、 安全模式:beats(Filebeat、Metricbeat、Packetbeat、Winlogbeat等)作为日志搜集器
Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据);
Winlogbeat(搜集 Windows 事件日志数据)。
工作模式:Beats 将搜集到的数据发送到 Logstash,经 Logstash 解析、过滤后,将其发送到 Elasticsearch 存储,并由 Kibana 呈现给用户;
模式特点:这种架构解决了 Logstash 在各服务器节点上占用系统资源高的问题。相比 Logstash,Beats 所占系统的 CPU 和内存几乎可以忽略不计。另外,Beats 和 Logstash 之间支持 SSL/TLS 加密传输,客户端和服务器双向认证,保证了通信安全。
################# Filebeat Configuration Example ########################
# This file is an example configuration file highlighting only the most common # options. The filebeat.full.yml file from the same directory contains all the # supported options with more comments. You can use it as a reference. # # You can find the full configuration reference here: # https://www.elastic.co/guide/en/beats/filebeat/index.html
#===================== Filebeat prospectors =====================
# Each - is a prospector. Most options can be set at the prospector level, so # you can use different prospectors for various configurations. # Below are the prospector specific configurations.
- input_type: log
# Paths that should be crawled and fetched. Glob based paths. paths: - /home/admin/helloworld/logs/*.log #- c:\\programdata\\elasticsearch\\logs\\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are # matching any regular expression from the list. #exclude_lines: ["^DBG"]
# Include lines. A list of regular expressions to match. It exports the lines that are # matching any regular expression from the list. #include_lines: ["^ERR", "^WARN"]
# Exclude files. A list of regular expressions to match. Filebeat drops the files that # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: [".gz$"]
# Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering #fields: # level: debug # review: 1
### Multiline options
# Mutiline can be used for log messages spanning multiple lines. This is common # for Java Stack Traces or C-Line Continuation
# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [ #multiline.pattern: ^\\[
# Defines if the pattern set under pattern should be negated or not. Default is false. #multiline.negate: false
# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern # that was (not) matched before or after or as long as a pattern is not matched based on negate. # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash #multiline.match: after
#====================== General =============================
# The name of the shipper that publishes the network data. It can be used to group # all the transactions sent by a single shipper in the web interface. #name:
# The tags of the shipper are included in their own field with each # transaction published. #tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the # output. #fields: # env: staging
#======================= Outputs ===========================
# Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------ #output.elasticsearch: # Array of hosts to connect to. # hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" #password: "changeme"
#--------------------------- Logstash output -------------------------------- output.logstash: # The Logstash hosts hosts: [""]
# Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key #ssl.key: "/etc/pki/client/cert.key"
#=========================== Logging =======================
# Sets log level. The default log level is info. # Available log levels are: critical, error, warning, info, debug #logging.level: debug
# At debug level, you can selectively enable logging only for some components. # To enable all selectors use ["*"]. Examples of other selectors are "beat", # "publish", "service". #logging.selectors: ["*"]
input { beats { port => 5044 codec => "json" } } #filters{ #…………(后续进行说明) #}
output { # 输出到控制台 # stdout { }
# 输出到redis redis { host => "" # redis主机地址 port => 6379 # redis端口号 password => "123456" # redis 密码 #db => 8 # redis数据库编号 data_type => "channel" # 使用发布/订阅模式 key => "logstash_list_0" # 发布通道名称 } #输出到kafka kafka { bootstrap_servers => "" topic_id => "test" } #输出到es elasticsearch { hosts => "node18:9200" codec => json } } |
三、 消息模式:Beats 还不支持输出到消息队列(新版本除外:5.0版本及以上),所以在消息队列前后两端只能是 Logstash 实例。logstash从各个数据源搜集数据,不经过任何处理转换仅转发出到消息队列(kafka、redis、rabbitMQ等),后logstash从消息队列取数据进行转换分析过滤,输出到elasticsearch,并在kibana进行图形化展示
模式特点:这种架构适合于日志规模比较庞大的情况。但由于 Logstash 日志解析节点和 Elasticsearch 的负荷比较重,可将他们配置为集群模式,以分担负荷。引入消息队列,均衡了网络传输,从而降低了网络闭塞,尤其是丢失数据的可能性,但依然存在 Logstash 占用系统资源过多的问题
工作流程:Filebeat采集—> logstash转发到kafka—> logstash处理从kafka缓存的数据进行分析—> 输出到es—> 显示在kibana
input { beats { port => 5044 codec => "json" } syslog{ } }
#filter{ # #}
output { # 输出到控制台 # stdout { }
# 输出到redis redis { host => "" # redis主机地址 port => 6379 # redis端口号 password => "123456" # redis 密码 #db => 8 # redis数据库编号 data_type => "channel" # 使用发布/订阅模式 key => "logstash_list_0" # 发布通道名称 } #输出到kafka kafka { bootstrap_servers => "" topic_id => "test" } } |
input{ kafka { bootstrap_servers => "" topics => ["test"] #decroate_events => true group_id => "consumer-test"(消费组) #decroate_events => true auto_offset_reset => "earliest"(初始消费,相当于from beginning,不设置,相当于是监控启动后的kafka的消息生产) } } #filter{ #} output { elasticsearch { hosts => "" codec => json } } |
input{ kafka { bootstrap_servers => "" topics => ["test"] group_id => "consumer-test" #decroate_events => true auto_offset_reset => "earliest" }
} #flter{ # #}
elasticsearch { hosts => "" codec => json }
} |
################# Filebeat Configuration Example #########################
# This file is an example configuration file highlighting only the most common # options. The filebeat.full.yml file from the same directory contains all the # supported options with more comments. You can use it as a reference. # # You can find the full configuration reference here: # https://www.elastic.co/guide/en/beats/filebeat/index.html
#================== Filebeat prospectors===========================
# Each - is a prospector. Most options can be set at the prospector level, so # you can use different prospectors for various configurations. # Below are the prospector specific configurations.
- input_type: log
# Paths that should be crawled and fetched. Glob based paths. paths: - /home/admin/helloworld/logs/*.log #- c:\\programdata\\elasticsearch\\logs\\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are # matching any regular expression from the list. #exclude_lines: ["^DBG"]
# Include lines. A list of regular expressions to match. It exports the lines that are # matching any regular expression from the list. #include_lines: ["^ERR", "^WARN"]
# Exclude files. A list of regular expressions to match. Filebeat drops the files that # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: [".gz$"]
# Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering #fields: # level: debug # review: 1
### Multiline options
# Mutiline can be used for log messages spanning multiple lines. This is common # for Java Stack Traces or C-Line Continuation
# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [ #multiline.pattern: ^\\[
# Defines if the pattern set under pattern should be negated or not. Default is false. #multiline.negate: false
# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern # that was (not) matched before or after or as long as a pattern is not matched based on negate. # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash #multiline.match: after
#============================ General=========================
# The name of the shipper that publishes the network data. It can be used to group # all the transactions sent by a single shipper in the web interface. #name:
# The tags of the shipper are included in their own field with each # transaction published. #tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the # output. #fields: # env: staging
#======================== Outputs ============================
# Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------ #output.elasticsearch: # Array of hosts to connect to. # hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" #password: "changeme"
#----------------------------- Logstash output -------------------------------- #output.logstash: # The Logstash hosts # hosts: [""]
#-----------------------------kafka output----------------------------------- #output.kafka: # enabled: true # hosts: [",,"] # topics: \'test\' output.kafka: hosts: [""] topic: test required_acks: 1
# Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key #ssl.key: "/etc/pki/client/cert.key"
#======================== Logging ============================
# Sets log level. The default log level is info. # Available log levels are: critical, error, warning, info, debug #logging.level: debug
# At debug level, you can selectively enable logging only for some components. # To enable all selectors use ["*"]. Examples of other selectors are "beat", # "publish", "service". #logging.selectors: ["*"] |
input{ kafka { bootstrap_servers => "" topics => ["test"] group_id => "consumer-test" #decroate_events => true auto_offset_reset => "earliest" }
} #flter{ # #}
elasticsearch { hosts => "" codec => json }
} |
ssl_certificate_authorities :filebeat端传来的证书所在位置
ssl_certificate => 本端生成的证书所在的位置
ssl_key => /本端生成的密钥所在的位置
ssl_verify_mode => "force_peer"
input { beats { port => 5044 codec => "json" ssl => true ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"] ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt" ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key" ssl_verify_mode => "force_peer"#(需与 } syslog{ } }
output { # 输出到控制台 # stdout { }
# 输出到redis redis { host => "" # redis主机地址 port => 6379 # redis端口号 password => "123456" # redis 密码 #db => 8 # redis数据库编号 data_type => "channel" # 使用发布/订阅模式 key => "logstash_list_0" # 发布通道名称 } #输出到kafka kafka { bootstrap_servers => "" topic_id => "test" } #输出到es elasticsearch { hosts => "node18:9200" codec => json }
} |
################ #Filebeat Configuration Example #####################
# This file is an example configuration file highlighting only the most common # options. The filebeat.full.yml file from the same directory contains all the # supported options with more comments. You can use it as a reference. # # You can find the full configuration reference here: # https://www.elastic.co/guide/en/beats/filebeat/index.html
#=================== Filebeat prospectors ========================
# Each - is a prospector. Most options can be set at the prospector level, so # you can use different prospectors for various configurations. # Below are the prospector specific configurations.
- input_type: log
# Paths that should be crawled and fetched. Glob based paths. paths: - /home/admin/helloworld/logs/*.log #- c:\\programdata\\elasticsearch\\logs\\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are # matching any regular expression from the list. #exclude_lines: ["^DBG"]
# Include lines. A list of regular expressions to match. It exports the lines that are # matching any regular expression from the list. #include_lines: ["^ERR", "^WARN"]
# Exclude files. A list of regular expressions to match. Filebeat drops the files that # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: [".gz$"]
# Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering #fields: # level: debug # review: 1
### Multiline options
# Mutiline can be used for log messages spanning multiple lines. This is common # for Java Stack Traces or C-Line Continuation
# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [ #multiline.pattern: ^\\[
# Defines if the pattern set under pattern should be negated or not. Default is false. #multiline.negate: false
# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern # that was (not) matched before or after or as long as a pattern is not matched based on negate. # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash #multiline.match: after
#======================== General ============================
# The name of the shipper that publishes the network data. It can be used to group # all the transactions sent by a single shipper in the web interface. #name:
# The tags of the shipper are included in their own field with each # transaction published. #tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the # output. #fields: # env: staging
#========================= Outputs ===========================
# Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used.
#----------------------------- Elasticsearch output ------------------------------ #output.elasticsearch: # Array of hosts to connect to. # hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" #password: "changeme"
#----------------------------- Logstash output -------------------------------- output.logstash: # The Logstash hosts hosts: [""] #加密传输 ssl.certificate_authorities: ["/usr/local/filebeat-5.6.10/pki/tls/certs/logstash.crt"] ssl.certificate: "/usr/local/filebeat-5.6.10/pki/tls/certs/filebeat.crt" ssl.key: "/usr/local/filebeat-5.6.10/pki/tls/private/filebeat.key"
#----------------------------- kafka output----------------------------------- #output.kafka: # hosts: [""] # topic: test # required_acks: 1
# Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key #ssl.key: "/etc/pki/client/cert.key"
#========================== Logging =========================
# Sets log level. The default log level is info. # Available log levels are: critical, error, warning, info, debug #logging.level: debug
# At debug level, you can selectively enable logging only for some components. # To enable all selectors use ["*"]. Examples of other selectors are "beat", # "publish", "service". #logging.selectors: ["*"] |
input { file { path => [ # 这里填写需要监控的文件 "/home/admin/helloworld/logs/catalina.out" ] } }
output { kafka { # 输出到控制台 # stdout { } # 输出到kafka bootstrap_servers => "" topic_id => "test" } } |
input{ #从redis读取 redis { host => "" # redis主机地址 port => 6379 # redis端口号 password => "123456" # redis 密码 #db => 8 # redis数据库编号 data_type => "channel" # 使用发布/订阅模式 key => "logstash_list_0" # 发布通道名称 } #从kafka读取 kafka { bootstrap_servers => "" topics => ["test"] auto_offset_reset => "earliest" } }
output { #输出到文件 file { path => "/usr/local/logstash-5.6.10/data/log/logstash/all1.log" # 指定写入文件路径 # message_format => "%{host} %{message}" # 指定写入格式 flush_interval => 0 # 指定刷新间隔,0代表实时写入 codec => json } #输出到es elasticsearch { hosts => "node18:9200" codec => json } } |
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://" jdbc_user => "fyyq" jdbc_password => "fyyq@2017" jdbc_driver_library => "/usr/local/logstash-5.6.10/mysql-connector-java-5.1.46.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" statement_filepath => "/usr/local/logstash-5.6.10/mysql2es.sql" #schedule => "* * * * *" } }
output { stdout { codec => json_lines } elasticsearch { hosts => "node18:9200" #index => "mainIndex" #document_type => "user" #document_id => "%{id}" } } |
select * from sys_log
input { beats { port => 5044 #codec => "json" ssl => true ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"] ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt" ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key" ssl_verify_mode => "force_peer" } }
filter{ grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }
output { # 输出到控制台 # stdout { }
# 输出到redis redis { host => "" # redis主机地址 port => 6379 # redis端口号 password => "123456" # redis 密码 #db => 8 # redis数据库编号 data_type => "channel" # 使用发布/订阅模式 key => "logstash_list_0" # 发布通道名称 } #输出到kafka kafka { bootstrap_servers => "" topic_id => "test" } #输出到es elasticsearch { hosts => "node18:9200" codec => json } #输出到hdfs webhdfs { host => "" port => 50070 path => "/user/logstash/dt=%{+YYYY-MM-dd}/%{@source_host}-%{+HH}.log" user => "hadoop" } } |
Setting |
Input type |
Required |
No(默认为{}) |
No(输入数据的编解码器,默认“plain”) |
No(默认true) |
No(自动生成,但最好自行定义) |
No |
No |
json (json格式编解码器)
msgpack (msgpack格式编解码器)
1、beat-input:Receives events from the Elastic Beats framework,从框架接收事件
Setting |
Input type |
Required |
No |
No |
No |
No |
Yes(必填项) |
No |
a valid filesystem path |
No |
No |
No |
a valid filesystem path |
No |
No |
string,one of |
No |
< 以上是关于logstash的各个场景应用(配置文件均已实践过)的主要内容,如果未能解决你的问题,请参考以下文章 ELK——Logstash 2.2 date 插件翻译+实践 ELK(Elasticsearch Logstash以及Kibana) |