es-日志存储-Logstash 介绍

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了es-日志存储-Logstash 介绍相关的知识,希望对你有一定的参考价值。

参考技术A

本文是elasticsearch官方文档logstash的翻译,你也可<a href="https://www.elastic.co/guide/en/logstash/current/index.html">查看原文</a>

注:Logstash意思是日志存储,下文中对本词使用英文。

Logstash is an open source data collection engine with real-time pipelining capabilities(功能). Logstash can dynamically unify(统一) data from disparate(不同的) sources and normalize the data into destinations(目的地) of your choice. Cleanse(净化) and democratize(大众化) all your data for diverse(不同的) advanced downstream(下游) analytics and visualization(形象化) use cases.

While Logstash originally(最初) drove innovation(创新) in log collection, its capabilities extend well beyond(超越) that use case. Any type of event can be enriched and transformed(转变) with a broad(宽的) array of input, filter, and output plugins, with many native codecs further(更好地) simplifying(简化) the ingestion(吸收) process. Logstash accelerates(加速) your insights(洞察力) by harnessing(利用) a greater volume and variety(多样) of data.

The ingestion workhorse for Elasticsearch and more
Horizontally(水平) scalable(可扩展的) data processing pipeline with strong Elasticsearch and Kibana synergy(协同)

Pluggable pipeline architecture
Mix, match, and orchestrate different inputs, filters, and outputs to play in pipeline harmony

Community-extensible and developer-friendly plugin ecosystem
Over 200 plugins available, plus the flexibility of creating and contributing your own

Collect more, so you can know more. Logstash welcomes data of all shapes and sizes.

Logs and Metrics
Where it all started.

The Web
Unlock the World Wide Web.

Data Stores and Streams
Discover more value from the data you already own.

Sensors and IoT
Explore an expansive breadth of other data.

The better the data, the better the knowledge. Clean and transform your data during ingestion to gain near real-time insights immediately at index or output time. Logstash comes out-of-box with many aggregations and mutations along with pattern matching, geo mapping, and dynamic lookup capabilities.

Route your data where it matters most. Unlock various downstream analytical and operational use cases by storing, analyzing, and taking action on your data.

Analysis
Elasticsearch
Data stores such as MongoDB and Riak

Archiving
HDFS
S3
Google Cloud Storage

Monitoring
Nagios
Ganglia
Zabbix
Graphite
Datadog
CloudWatch

Alerting
Watcher with Elasticsearch
Email
Pagerduty
HipChat
IRC
SNS

logstash监听logback输出日志并存储到es

先部署es

下载->解压->修改配置->运行

下载安装部署在我的另一篇文章里介绍了
https://blog.csdn.net/weixin_43944305/article/details/118469227

然后部署logstash
下载
https://www.elastic.co/cn/downloads/past-releases#logstash

解压

tar -zxvf logstash-6.3.0.tar.gz -C ~/app/
mv logstash-6.3.0 logstash

改下jvm配置,自己的小服务器吃不消

/home/ubuntu/app/logstash/config/jvm.options

修改logstash.yml对自定义配置文件热加载

# 设置节点的名字
node.name: logstash-01
# 设置 pipeline 的id
pipeline.id: main
pipeline.ordered: auto
# 设置pipeline 的线程数(filter+output),默认是cpu的核数
# pipeline.workers: 2
# 设置main pipeline文件所在的位置
# path.config:
# 自动重新加载pipeline配置文件
config.reload.automatic: true
# 设置http api 绑定的ip和端口
http.host: 127.0.0.1
http.port: 9201
# 设置logstash队列的类型 为持久化,默认是 memory
queue.type: persisted

/home/ubuntu/app/logstash/config目录下自定义一个配置文件,监听日志端口

input 
  tcp 
    # Logstash 作为服务
    mode => "server"
    #    host => "localhost"
    # 开放9001端口进行采集日志
    port => 9101
    # 编解码器
    codec => json_lines
  


output 
  elasticsearch 
    # 配置ES的地址
    hosts => ["http://localhost:9200"]
    # 在ES里产生的index的名称
    index => "app-search-log-collection-%+YYYY.MM.dd"
    #user => "elastic"
    #password => "changeme"
  
  stdout 
    codec => rubydebug
  

指定配置文件启动

./bin/logstash -f ~/app/logstash/config/logstash-test.yml
#nohup后台启动
nohup /home/ubuntu/app/logstash/bin/logstash -f /home/ubuntu/app/logstash/config/logstash-test.conf &

再装个kibana来查看存储到es的数据

下载
https://www.elastic.co/cn/downloads/past-releases#kibana

解压

tar -zxvf kibana-6.3.0-linux-x86_64.tar.gz -C ~/app/
mv kibana-6.3.0-linux-x86_64 kibana-6.3.0

修改下配置

server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://localhost:9200"
i18n.defaultLocale: "zh-CN"

启动

/home/ubuntu/app/kibana-6.3.0/bin/kibana
#后台启动
nohup ./kibana &

项目集成logback

项目里用了lombok

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>6.3</version>
</dependency>

项目的logback配置

<?xml version="1.0" encoding="utf-8" ?>  <configuration>

    <property name="logstash.host" value="110.40.181.73"/>
    <property name="logstash.port" value="9101"/>
    <contextName>springcloud</contextName>


    <appender name="consoleLog" class="ch.qos.logback.core.ConsoleAppender">
        <layout clas="ch.qos.logback.classic.PatternLayout">
            <pattern>
                %date %level [%thread] %logger36 [%file : %line] %msg%n
            </pattern>
        </layout>
    </appender>

    <appender name="fileInfoLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>$logging.path.%dyyyy-MM-dd.zip</fileNamePattern>
        </rollingPolicy>
        <!--&lt;!&ndash;过滤输出到日志文件的日志内容&ndash;&gt;
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>
                &lt;!&ndash;只配置这个还是会输出ERROR等级的日志,因为error level > info level;拦截的是>=info等级的内容&ndash;&gt;
                INFO
            </level>
        </filter>-->

        <!--要用下面这种方式才能过滤输出到日志文件的ERROR信息,类似switch case,onMatch和onMisMatch中的指令在LevelFilter类的FilterReply中可以看到-->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <!--匹配到上面level的ERROR等级日志时,禁止输出到日志文件中-->
            <onMatch>DENY</onMatch>
            <!--匹配到上面的ERROR日志时,输出到日志文件中,(还有一个指令NEUTRAL,代表当前这条策略不做操作,让下一条策略去匹配)-->
            <onMismatch>ACCEPT</onMismatch>
        </filter>
        <encoder>
            <pattern>
                %date %level [%thread] %logger36 [%file : %line] %msg%n
            </pattern>
        </encoder>
        <!--滚动策略,按照时间滚动-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--路径要先去建好,不然会报错,linux/Mac下要给予可写权限,sudo chmod -R 777 /var/log/tomcat
            windows 类似,D:\\log\\tomcat-->
            <fileNamePattern>
                <!--在文件名中带上时间,配置完后要在下面root level中指定这个配置-->
                D:\\tomcat\\info.%d.log
            </fileNamePattern>
        </rollingPolicy>
    </appender>

    <appender name="fileErrorLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>$logging.path.%dyyyy-MM-dd.zip</fileNamePattern>
        </rollingPolicy>
        <!--过滤输出到日志文件的日志内容-->
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>
                ERROR
            </level>
        </filter>
        <encoder>
            <pattern>
                %date %level [%thread] %logger36 [%file : %line] %msg%n
            </pattern>
        </encoder>
        <!--滚动策略,按照时间滚动,每天创建一个文件-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--路径要先去建好,不然会报错,linux/Mac下要给予可写权限,sudo chmod -R 777 /var/log/tomcat
            windows 类似,D:\\log\\tomcat-->
            <fileNamePattern>
                <!--在文件名中带上时间,配置完后要在下面root level中指定这个配置-->
                D:\\tomcat\\error.%d.log
            </fileNamePattern>
        </rollingPolicy>
    </appender>



    <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <!-- <destination>$logstash.host:$logstash.port</destination>-->
        <remoteHost>$logstash.host</remoteHost>
        <port>$logstash.port</port>
<!--        这个值只能配置2的N次幂(2,4,8,16...),默认8192,单位是B-->
        <queueSize>1048576</queueSize>

        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <!--匹配到上面level的ERROR等级日志时,禁止输出到日志文件中-->
            <onMatch>ACCEPT</onMatch>
            <!--匹配到上面的ERROR日志时,输出到日志文件中,(还有一个指令NEUTRAL,代表当前这条策略不做操作,让下一条策略去匹配)-->
            <onMismatch>DENY</onMismatch>
        </filter>

        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>UTC</timeZone>
                </timestamp>
                <pattern>
                    <pattern>
                        
                        "severity":"%level",
                        "service": "%contextName",
                        "pid": "$PID:-",
                        "thread": "%thread",
                        "class": "%logger40",
                        "rest": "%message->%exfull"
                        
                    </pattern>
                </pattern>
            </providers>
        </encoder>
    </appender>

    <!--root代表整个项目的日志等级-->
    <root level="info">
        <!--上面配置的appender中的name-->
        <appender-ref ref="consoleLog"/>
        <appender-ref ref="fileInfoLog"/>
        <appender-ref ref="fileErrorLog"/>
        <appender-ref ref="LOGSTASH"/>
    </root>
</configuration>

进kibana(kibana的ip+5601默认端口访问)看一下,有了,搞定

以上是关于es-日志存储-Logstash 介绍的主要内容,如果未能解决你的问题,请参考以下文章

Logstash 常用 filter 插件介绍

ELK日志分析系统(实例!!!)

elk

Logstash 基础入门

Es+kafka搭建日志存储查询系统(设计)

Logstash使用介绍