logstash的各个场景应用(配置文件均已实践过)

Posted 大神之路上一只小虾米的自白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash的各个场景应用(配置文件均已实践过)相关的知识,希望对你有一定的参考价值。

场景:

1) datasource->logstash->elasticsearch->kibana

2) datasource->filebeat->logstash-> elasticsearch->kibana

3) datasource->filebeat->logstash->redis/kafka->logstash-> elasticsearch->kibana

4) kafka->logstash-> elasticsearch->kibana

5) datasource->filebeat->kafka->logstash->elasticsearch->kibana(最常用)

6) filebeatSSL加密传输

7) datasource->logstash->redis/kafka->logstash->elasticsearch->kibana

8) mysql->logstash->elasticsearch->kibana

上述主要是对下面传输处理场景的一个概括,从数据源开始,如何采集,用什么工具采集,采集到哪里,经过怎样的处理过滤,传输到哪里,怎样进行展示

 

输入、输出、过滤主要通过插件实现(包含多类型插件),插件教程参考官网

https://www.elastic.co/guide/en/logstash/current/index.html

 【安装部署这种官网或者社区已经很完善,此处不做赘述,可自行去官网查看】

 ps【redis集群安装文档前面已经说明过,可自行查看】

 

前提条件

1) java环境:jdk8;

2) elk已搭建完毕;

3) elasticsearch、kibana、logstash版本最好保持一致,目前环境是5.6.10版本

4) logstash建议使用root用户(拥有足够权限去搜集所需日志文件);

5) elasticsearch使用普通用户安装,新版本已限制不允许root安装;

6) filebeat安装完毕

启动命令:

7) logstash启动命令:

nohup ./bin/logstash -f ***.conf –config.reload.automatic >/dev/null 2>/dev/null &

8) filebeat启动命令: nohup ./filebeat -e -c filebeat.yml>/dev/null 2>/dev/null &

9)elasticsearch启动命令:./elasticsearch -d  

10)kibana启动命令:nohup ./bin/kibana & 

 

Logstash启动命令:--config.reload.automatic自动重新加载配置文件,无需重启logstash

filebeat启动命令:-e参数指定输出日志到stderr-c参数指定配置文件路径

 

场景介绍

一、 简单模式:以logstash作为日志搜索器

架构:logstash采集、处理、转发到elasticsearch存储,在kibana进行展示

特点:这种结构因为需要在各个服务器上部署 Logstash,而它比较消耗 CPU 和内存资源,所以比较适合计算资源丰富的服务器,否则容易造成服务器性能下降,甚至可能导致无法正常工作。

 

Demo1:

test1.conf:

控制台输入,不经过任何处理转换(仅传输),输出到控制台(或者elasticsearch、文件----自行选择):

#控制台输入

input { stdin { } }

output {

     #codec输出到控制台

stdout { codec=> rubydebug }

#输出到elasticsearch

elasticsearch {

        hosts => "node18:9200"

        codec => json

        }

#输出到文件

file {

    path => "/usr/local/logstash-5.6.10/data/log/logstash/all.log" #指定写入文件路径

    flush_interval => 0                  # 指定刷新间隔,0代表实时写入

    codec => json

    }

}

二、 安全模式:beats(FilebeatMetricbeatPacketbeatWinlogbeat等)作为日志搜集器

Packetbeat(搜集网络流量数据);

Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据);

Filebeat(搜集文件数据)-------最常用

Winlogbeat(搜集 Windows 事件日志数据)。

架构:

 

工作模式:Beats 将搜集到的数据发送到 Logstash,经 Logstash 解析、过滤后,将其发送到 Elasticsearch 存储,并由 Kibana 呈现给用户;

模式特点:这种架构解决了 Logstash 在各服务器节点上占用系统资源高的问题。相比 Logstash,Beats 所占系统的 CPU 和内存几乎可以忽略不计。另外,Beats 和 Logstash 之间支持 SSL/TLS 加密传输,客户端和服务器双向认证,保证了通信安全。

因此这种架构适合对数据安全性要求较高,同时各服务器性能比较敏感的场景

Demo2:

filebeat.yml:

################# Filebeat Configuration Example ########################

 

# This file is an example configuration file highlighting only the most common

# options. The filebeat.full.yml file from the same directory contains all the

# supported options with more comments. You can use it as a reference.

#

# You can find the full configuration reference here:

# https://www.elastic.co/guide/en/beats/filebeat/index.html

 

#===================== Filebeat prospectors =====================

 

filebeat.prospectors:

 

# Each - is a prospector. Most options can be set at the prospector level, so

# you can use different prospectors for various configurations.

# Below are the prospector specific configurations.

 

- input_type: log

 

  # Paths that should be crawled and fetched. Glob based paths.

  paths:

    - /home/admin/helloworld/logs/*.log

    #- c:\\programdata\\elasticsearch\\logs\\*

 

  # Exclude lines. A list of regular expressions to match. It drops the lines that are

  # matching any regular expression from the list.

  #exclude_lines: ["^DBG"]

 

  # Include lines. A list of regular expressions to match. It exports the lines that are

  # matching any regular expression from the list.

  #include_lines: ["^ERR", "^WARN"]

 

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that

  # are matching any regular expression from the list. By default, no files are dropped.

  #exclude_files: [".gz$"]

 

  # Optional additional fields. These field can be freely picked

  # to add additional information to the crawled log files for filtering

  #fields:

  #  level: debug

  #  review: 1

 

  ### Multiline options

 

  # Mutiline can be used for log messages spanning multiple lines. This is common

  # for Java Stack Traces or C-Line Continuation

 

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [

  #multiline.pattern: ^\\[

 

  # Defines if the pattern set under pattern should be negated or not. Default is false.

  #multiline.negate: false

 

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern

  # that was (not) matched before or after or as long as a pattern is not matched based on negate.

  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash

  #multiline.match: after

 

 

#====================== General =============================

 

# The name of the shipper that publishes the network data. It can be used to group

# all the transactions sent by a single shipper in the web interface.

#name:

 

# The tags of the shipper are included in their own field with each

# transaction published.

#tags: ["service-X", "web-tier"]

 

# Optional fields that you can specify to add additional information to the

# output.

#fields:

#  env: staging

 

#======================= Outputs ===========================

 

# Configure what outputs to use when sending the data collected by the beat.

# Multiple outputs may be used.

 

#-------------------------- Elasticsearch output ------------------------------

#output.elasticsearch:

  # Array of hosts to connect to.

  # hosts: ["localhost:9200"]

 

  # Optional protocol and basic auth credentials.

  #protocol: "https"

  #username: "elastic"

  #password: "changeme"

 

#--------------------------- Logstash output --------------------------------

output.logstash:

  # The Logstash hosts

  hosts: ["192.168.80.34:5044"]

 

  # Optional SSL. By default is off.

  # List of root certificates for HTTPS server verifications

  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

 

  # Certificate for SSL client authentication

  #ssl.certificate: "/etc/pki/client/cert.pem"

 

  # Client Certificate Key

  #ssl.key: "/etc/pki/client/cert.key"

 

#=========================== Logging =======================

 

# Sets log level. The default log level is info.

# Available log levels are: critical, error, warning, info, debug

#logging.level: debug

 

# At debug level, you can selectively enable logging only for some components.

# To enable all selectors use ["*"]. Examples of other selectors are "beat",

# "publish", "service".

#logging.selectors: ["*"]

 

 

34服务器----test2.conf:

input {

    beats {

    port => 5044

    codec => "json"

}

}

#filters{

#…………(后续进行说明)

#}

 

output {

    # 输出到控制台

    # stdout { }

 

    # 输出到redis

    redis {

        host => "192.168.80.32"   # redis主机地址

        port => 6379              # redis端口号

        password => "123456"          # redis 密码

        #db => 8                   # redis数据库编号

        data_type => "channel"    # 使用发布/订阅模式

        key => "logstash_list_0"  # 发布通道名称

}

#输出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id         => "test" 

       }

#输出到es

elasticsearch {

        hosts => "node18:9200"

        codec => json

        }

}

三、 消息模式:Beats 还不支持输出到消息队列新版本除外:5.0版本及以上),所以在消息队列前后两端只能是 Logstash 实例。logstash从各个数据源搜集数据,不经过任何处理转换仅转发出到消息队列(kafka、redis、rabbitMQ等),后logstash从消息队列取数据进行转换分析过滤,输出到elasticsearch,并在kibana进行图形化展示

架构(Logstash进行日志解析所在服务器性能各方面必须要足够好):

 

模式特点这种架构适合于日志规模比较庞大的情况。但由于 Logstash 日志解析节点和 Elasticsearch 的负荷比较重,可将他们配置为集群模式,以分担负荷。引入消息队列,均衡了网络传输,从而降低了网络闭塞,尤其是丢失数据的可能性,但依然存在 Logstash 占用系统资源过多的问题

工作流程:Filebeat采集—>  logstash转发到kafka—>  logstash处理从kafka缓存的数据进行分析—>  输出到es—>  显示在kibana

Msg1.conf:

input {

    beats {

    port => 5044

    codec => "json"

       }

    syslog{

       }

}

 

#filter{

#

#}

 

output {

    # 输出到控制台

    # stdout { }

 

    # 输出到redis

    redis {

        host => "192.168.80.32"   # redis主机地址

        port => 6379              # redis端口号

        password => "123456"          # redis 密码

       #db => 8                   # redis数据库编号

        data_type => "channel"    # 使用发布/订阅模式

        key => "logstash_list_0"  # 发布通道名称

    }

    #输出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id          => "test" 

       }     

}

Msg2.conf:

input{

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

           topics          => ["test"]

           #decroate_events   => true

        group_id          => "consumer-test"(消费组)

           #decroate_events  => true

        auto_offset_reset => "earliest"(初始消费,相当于from beginning,不设置,相当于是监控启动后的kafka的消息生产)

   }

}

#filter{

#}

output {

       elasticsearch {

       hosts => "192.168.80.18:9200"   

       codec => json

       }

}

 四、logstash从kafka消息队列直接读取数据并处理、输出到es(因为从kafka内部直接读取,相当于是已经在缓存内部,直接logstash处理后就可以进行输出,输出到文件、es等)

工作模式:【数据已存在kafka对应主题内】单独的logstash,kafka读取,经过处理输出到es并在kibana进行展示

input{

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

            topics          => ["test"]

         group_id       => "consumer-test"

         #decroate_events  => true

       auto_offset_reset => "earliest"

   }

 

}

#flter{

#

#}

 

  • output {

       elasticsearch {

       hosts => "192.168.80.18:9200"

       codec => json

       }

      

}

五、filebeat新版本(5.0以上)支持直接支持输出到kafka,而无需经过logstash接收转发到kafka

 

Filebeat采集完毕直接入到kafka消息队列,进而logstash取出数据,进行处理分析输出到es,并在kibana进行展示。

filebeat.yml:

################# Filebeat Configuration Example #########################

 

# This file is an example configuration file highlighting only the most common

# options. The filebeat.full.yml file from the same directory contains all the

# supported options with more comments. You can use it as a reference.

#

# You can find the full configuration reference here:

# https://www.elastic.co/guide/en/beats/filebeat/index.html

 

#================== Filebeat prospectors===========================

 

filebeat.prospectors:

 

# Each - is a prospector. Most options can be set at the prospector level, so

# you can use different prospectors for various configurations.

# Below are the prospector specific configurations.

 

- input_type: log

 

  # Paths that should be crawled and fetched. Glob based paths.

  paths:

    - /home/admin/helloworld/logs/*.log

    #- c:\\programdata\\elasticsearch\\logs\\*

 

  # Exclude lines. A list of regular expressions to match. It drops the lines that are

  # matching any regular expression from the list.

  #exclude_lines: ["^DBG"]

 

  # Include lines. A list of regular expressions to match. It exports the lines that are

  # matching any regular expression from the list.

  #include_lines: ["^ERR", "^WARN"]

 

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that

  # are matching any regular expression from the list. By default, no files are dropped.

  #exclude_files: [".gz$"]

 

  # Optional additional fields. These field can be freely picked

  # to add additional information to the crawled log files for filtering

  #fields:

  #  level: debug

  #  review: 1

 

  ### Multiline options

 

  # Mutiline can be used for log messages spanning multiple lines. This is common

  # for Java Stack Traces or C-Line Continuation

 

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [

  #multiline.pattern: ^\\[

 

  # Defines if the pattern set under pattern should be negated or not. Default is false.

  #multiline.negate: false

 

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern

  # that was (not) matched before or after or as long as a pattern is not matched based on negate.

  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash

  #multiline.match: after

 

#============================ General=========================

 

# The name of the shipper that publishes the network data. It can be used to group

# all the transactions sent by a single shipper in the web interface.

#name:

 

# The tags of the shipper are included in their own field with each

# transaction published.

#tags: ["service-X", "web-tier"]

 

# Optional fields that you can specify to add additional information to the

# output.

#fields:

#  env: staging

 

#======================== Outputs ============================

 

# Configure what outputs to use when sending the data collected by the beat.

# Multiple outputs may be used.

 

#-------------------------- Elasticsearch output ------------------------------

#output.elasticsearch:

  # Array of hosts to connect to.

  # hosts: ["localhost:9200"]

 

  # Optional protocol and basic auth credentials.

  #protocol: "https"

  #username: "elastic"

  #password: "changeme"

 

#----------------------------- Logstash output --------------------------------

#output.logstash:

  # The Logstash hosts

#  hosts: ["192.168.80.34:5044"]

 

#-----------------------------kafka  output-----------------------------------

#output.kafka:

#  enabled: true

#  hosts: ["192.168.80.42:9092,192.168.80.43:9092,192.168.80.44:9092"]

#  topics: \'test\'

output.kafka:

  hosts: ["192.168.80.42:9092"]

  topic: test

  required_acks: 1

 

 

  # Optional SSL. By default is off.

  # List of root certificates for HTTPS server verifications

  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

 

  # Certificate for SSL client authentication

  #ssl.certificate: "/etc/pki/client/cert.pem"

 

  # Client Certificate Key

  #ssl.key: "/etc/pki/client/cert.key"

 

#======================== Logging ============================

 

# Sets log level. The default log level is info.

# Available log levels are: critical, error, warning, info, debug

#logging.level: debug

 

# At debug level, you can selectively enable logging only for some components.

# To enable all selectors use ["*"]. Examples of other selectors are "beat",

# "publish", "service".

#logging.selectors: ["*"]

 

logstash.conf:

input{

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

            topics          => ["test"]

         group_id       => "consumer-test"

         #decroate_events  => true

       auto_offset_reset => "earliest"

   }

 

}

#flter{

#

#}

 

  • output {

       elasticsearch {

       hosts => "192.168.80.18:9200"

       codec => json

       }

      

}

 

六、SSL加密传输(增强安全性,仅配置了秘钥和证书的filebeat服务器和logstash服务器才能进行日志文件数据的传输):

参考文档: https://blog.csdn.net/zsq12138/article/details/78753369

参考文档:https://blog.csdn.net/Gamer_gyt/article/details/69280693?locationNum=5&fps=1

Logstash的配置文件:

注释:

ssl_certificate_authorities :filebeat端传来的证书所在位置

ssl_certificate => 本端生成的证书所在的位置

ssl_key => /本端生成的密钥所在的位置

ssl_verify_mode => "force_peer"

beat.conf:

input {

    beats {

    port => 5044

    codec => "json"

    ssl => true

   ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"]

   ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt"

   ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key"

ssl_verify_mode => "force_peer"#(需与ssl_certificate_authorities一起使用

       }

    syslog{

       }

}

 

output {

    # 输出到控制台

    # stdout { }

 

    # 输出到redis

    redis {

        host => "192.168.80.32"   # redis主机地址

        port => 6379              # redis端口号

        password => "123456"          # redis 密码

       #db => 8                   # redis数据库编号

        data_type => "channel"    # 使用发布/订阅模式

        key => "logstash_list_0"  # 发布通道名称

    }

    #输出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id          => "test" 

       }     

    #输出到es

    elasticsearch {

       hosts => "node18:9200"

       codec => json

       }

 

}

 

filebeat的配置文件:

filebeat.yml:

################ #Filebeat Configuration Example #####################

 

# This file is an example configuration file highlighting only the most common

# options. The filebeat.full.yml file from the same directory contains all the

# supported options with more comments. You can use it as a reference.

#

# You can find the full configuration reference here:

# https://www.elastic.co/guide/en/beats/filebeat/index.html

 

#=================== Filebeat prospectors ========================

 

filebeat.prospectors:

 

# Each - is a prospector. Most options can be set at the prospector level, so

# you can use different prospectors for various configurations.

# Below are the prospector specific configurations.

 

- input_type: log

 

  # Paths that should be crawled and fetched. Glob based paths.

  paths:

    - /home/admin/helloworld/logs/*.log

    #- c:\\programdata\\elasticsearch\\logs\\*

 

  # Exclude lines. A list of regular expressions to match. It drops the lines that are

  # matching any regular expression from the list.

  #exclude_lines: ["^DBG"]

 

  # Include lines. A list of regular expressions to match. It exports the lines that are

  # matching any regular expression from the list.

  #include_lines: ["^ERR", "^WARN"]

 

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that

  # are matching any regular expression from the list. By default, no files are dropped.

  #exclude_files: [".gz$"]

 

  # Optional additional fields. These field can be freely picked

  # to add additional information to the crawled log files for filtering

  #fields:

  #  level: debug

  #  review: 1

 

  ### Multiline options

 

  # Mutiline can be used for log messages spanning multiple lines. This is common

  # for Java Stack Traces or C-Line Continuation

 

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [

  #multiline.pattern: ^\\[

 

  # Defines if the pattern set under pattern should be negated or not. Default is false.

  #multiline.negate: false

 

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern

  # that was (not) matched before or after or as long as a pattern is not matched based on negate.

  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash

  #multiline.match: after

 

#======================== General ============================

 

# The name of the shipper that publishes the network data. It can be used to group

# all the transactions sent by a single shipper in the web interface.

#name:

 

# The tags of the shipper are included in their own field with each

# transaction published.

#tags: ["service-X", "web-tier"]

 

# Optional fields that you can specify to add additional information to the

# output.

#fields:

#  env: staging

 

#========================= Outputs ===========================

 

# Configure what outputs to use when sending the data collected by the beat.

# Multiple outputs may be used.

 

#----------------------------- Elasticsearch output ------------------------------

#output.elasticsearch:

  # Array of hosts to connect to.

  # hosts: ["localhost:9200"]

 

  # Optional protocol and basic auth credentials.

  #protocol: "https"

  #username: "elastic"

  #password: "changeme"

 

#----------------------------- Logstash output --------------------------------

output.logstash:

# The Logstash hosts

  hosts: ["192.168.80.18:5044"]

#加密传输

  ssl.certificate_authorities: ["/usr/local/filebeat-5.6.10/pki/tls/certs/logstash.crt"]

  ssl.certificate: "/usr/local/filebeat-5.6.10/pki/tls/certs/filebeat.crt"

  ssl.key: "/usr/local/filebeat-5.6.10/pki/tls/private/filebeat.key" 

 

#----------------------------- kafka  output-----------------------------------

#output.kafka:

#  hosts: ["192.168.80.42:9092"]

#  topic: test

#  required_acks: 1

 

  # Optional SSL. By default is off.

  # List of root certificates for HTTPS server verifications

  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

 

  # Certificate for SSL client authentication

  #ssl.certificate: "/etc/pki/client/cert.pem"

 

  # Client Certificate Key

  #ssl.key: "/etc/pki/client/cert.key"

 

#========================== Logging =========================

 

# Sets log level. The default log level is info.

# Available log levels are: critical, error, warning, info, debug

#logging.level: debug

 

# At debug level, you can selectively enable logging only for some components.

# To enable all selectors use ["*"]. Examples of other selectors are "beat",

# "publish", "service".

#logging.selectors: ["*"]

  七、logstash(非filebeat)进行文件采集,输出到kafka缓存,读取kafka数据并处理输出到文件或es

读数据:

kafkaput.conf:

input {

    file {

        path => [

            # 这里填写需要监控的文件

            "/home/admin/helloworld/logs/catalina.out"

        ]

    }

}

 

output {

    kafka {

    # 输出到控制台

    # stdout { }

    # 输出到kafka

    bootstrap_servers => "192.168.80.42:9092"

    topic_id          => "test"

    }

}

 

取数据

indexer.conf

input{

#从redis读取

 redis {

        host => "192.168.80.32"   # redis主机地址

        port => 6379              # redis端口号

       password  => "123456"      # redis 密码

        #db => 8                   # redis数据库编号

        data_type => "channel"    # 使用发布/订阅模式

        key => "logstash_list_0"  # 发布通道名称

}

#从kafka读取

 kafka {

        bootstrap_servers => "192.168.80.42:9092"

           topics          => ["test"]

        auto_offset_reset => "earliest"

       }

}

 

output {

    #输出到文件

    file {

        path => "/usr/local/logstash-5.6.10/data/log/logstash/all1.log" # 指定写入文件路径

#       message_format => "%{host} %{message}"         # 指定写入格式

        flush_interval => 0                             # 指定刷新间隔,0代表实时写入

     codec => json

       }

   #输出到es

   elasticsearch {

       hosts => "node18:9200"

       codec => json

       }

}

八、logstash同步mysql数据库数据到es(logstash5版本以上已集成jdbc插件,无需下载安装,直接使用)

mysql2es.conf:

input {

 stdin { }

    jdbc {

        jdbc_connection_string => "jdbc:mysql://192.168.80.18:3306/fyyq-mysql"

        jdbc_user => "fyyq"

        jdbc_password => "fyyq@2017"

   jdbc_driver_library => "/usr/local/logstash-5.6.10/mysql-connector-java-5.1.46.jar"

        jdbc_driver_class => "com.mysql.jdbc.Driver"

        jdbc_paging_enabled => "true"

        statement_filepath => "/usr/local/logstash-5.6.10/mysql2es.sql"

        #schedule => "* * * * *"

    }

 }

 

 output {

     stdout {

        codec => json_lines

    }

    elasticsearch {

        hosts => "node18:9200"

        #index => "mainIndex"

        #document_type => "user"

        #document_id => "%{id}"

    }

}

 

mysql2es.sql:

 

select * from sys_log

 

 九、logstash输出到hdfs文件

input {

    beats {

      port => 5044

      #codec => "json"

      ssl => true

   ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"]

      ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt"

      ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key"

      ssl_verify_mode => "force_peer"

                           }

}

 

filter{

   grok {

       match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}

}

}

 

 

output {

    # 输出到控制台

    # stdout { }

 

    # 输出到redis

    redis {

        host => "192.168.80.32"   # redis主机地址

        port => 6379              # redis端口号

        password => "123456"          # redis 密码

       #db => 8                   # redis数据库编号

        data_type => "channel"    # 使用发布/订阅模式

        key => "logstash_list_0"  # 发布通道名称

    }

    #输出到kafka

    kafka {

        bootstrap_servers => "192.168.80.42:9092"

        topic_id          => "test" 

                           }      

    #输出到es

    elasticsearch {

                           hosts => "node18:9200"

                           codec => json

                           }

    #输出到hdfs

     webhdfs {

     host => "192.168.80.42"

     port => 50070

     path => "/user/logstash/dt=%{+YYYY-MM-dd}/%{@source_host}-%{+HH}.log"

     user => "hadoop"

       }

}

 十、Logstash-input插件及插件参数概览

仅以beat插件为例,后续插件将以连接形式提供(都是官网标准介绍)

所有输入插件都支持以下配置选项:

Setting

Input type

Required

add_field

hash

No(默认为{})

codec

codec

No(输入数据的编解码器,默认“plain”

enable_metric

boolean

No(默认true)

id

string

No(自动生成,但最好自行定义)

tags

array

No

type

string

No

codec:可选

json (json格式编解码器)

msgpack (msgpack格式编解码器)

plain(文本格式编解码器)

multiline(将多行文本event合并成一个event,eg:将java中的异常跟踪日志合并成一条消)]

 

常用输入插件:

1、beat-input:Receives events from the Elastic Beats framework,从框架接收事件

Settings:

Setting

Input type

Required

cipher_suites

array

No

client_inactivity_timeout

number

No

host

string

No

include_codec_tag

boolean

No

port

number

Yes(必填项)

ssl

boolean

No

ssl_certificate

a valid filesystem path

No

ssl_certificate_authorities

array

No

ssl_handshake_timeout

number

No

ssl_key

a valid filesystem path

No

ssl_key_passphrase

password

No

ssl_verify_mode

string,one of ["none", "peer","force_peer"]

No

tls_max_version

<

以上是关于logstash的各个场景应用(配置文件均已实践过)的主要内容,如果未能解决你的问题,请参考以下文章

Logstash实践

ELK——Logstash 2.2 date 插件翻译+实践

ELK(Elasticsearch Logstash以及Kibana)

《深入实践Spring Boot》阅读笔记之一:基础应用开发

ELK分析Apache访问日志并生成图像

Disconf实践指南:改造篇

(c)2006-2024 SYSTEM All Rights Reserved IT常识