Fluentd 日志处理-S3拉取日志处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Fluentd 日志处理-S3拉取日志处理相关的知识,希望对你有一定的参考价值。

S3日志拉取
这里是S3插件的官方文档
https://github.com/fluent/fluent-plugin-s3
使用前,我们需要安装好S3插件
我们因为从AWS的S3里拉取日志,需要在AWS上做一些配置:
1.我们需要创建一个SQS的队列,然后授予每个桶SQS的权限。
2.还需要给每个桶添加事件通知,这样S3桶一有更新,就会把更新扔到SQS里面。

<source>
    @type s3
    aws_key_id       XXXXXXXXXXXXXXXXXXXX
    aws_sec_key    XXXXXXXXXXXXXXXXXXXX
    s3_bucket        nx-rc-rancher-newelb-inner
    s3_region         cn-northwest-1
    tagelb-inner
    <sqs>
       queue_name  fluentd-s3      #这里只需要配置SQS的名字
    </sqs>
</source>
<source>
    @type s3
    aws_key_id      XXXXXXXXXXXXXXXXXXXX
    aws_sec_key    XXXXXXXXXXXXXXXXXXXX
    s3_bucket       nx-rc-rancher-newelb-com
    s3_region       cn-northwest-1
    <sqs>
       queue_name  fluentd-s3
    </sqs>
</source>
<source>
    @type s3
    aws_key_id       XXXXXXXXXXXXXXXXXXXX
    aws_sec_key    XXXXXXXXXXXXXXXXXXXX
    s3_bucket        nx-rc-www-newelb-com
    s3_region        cn-northwest-1
    <sqs>
queue_name      fluentd-s3
    </sqs>
</source>

这里的rewrite_tag_filter插件相当于logstash的if语句。

因为fluentd是通过流的形式进行信息的过滤和处理,而且没有if语句,只能通过重写tag来筛选不同的日志
官方文档
https://github.com/fluent/fluent-plugin-rewrite-tag-filter

<match elb-inner>
    @type rewrite_tag_filter
    <rule>
        key     message
        pattern /.yufuid.net:80/
        tag     elb-inner.net      #匹配的就重写标签
    </rule>
</match>
<filter elb-inner.net>
    @type parser                   #对日志就行分割和命名处理,fluentd的正则表达式可能需要自己写
    key_name message
    reserve_data yes
    <parse>
    @type regexp  
    expression /(?<elb_http_method>[^ ]+) (?<access_timestamp>[^ ]+) (?<elb_name>[^ ]+) (?<client_ip>[^ ]+):(?<client_Port>[^ ]+) (?<target_ip_port>[^ ]+) (?<request_processing_time>[^ ]+) (?<target_processing_time>[^ ]+) (?<response_processing_time>[^ ]+) (?<elb_status_code>[^ ]+) (?<target_status_code>[^ ]+) (?<received_bytes>[^ ]+) (?<send_bytes>[^ ]+) (?<request>"[^"]+") (?<client_info>"[^"]+") (?<ssl_cipher>[^ ]+) (?<ssl_protocol>[^ ]+) (?<target_group_arn>[^ ]+) (?<trace_ip>[^ ]+) (?<domainname>[^ ]+) (?<chose_cert_arn>[^ ]+) (?<matched_rule_priority>[^ ]+) (?<elb_name>[^ ]+) (?<request_creation_time>[^ ]+)/
    </parse>
</filter>
<filter elb-com>
    @type parser
    key_name message
    reserve_data yes
    <parse>
    @type regexp
            expression /(?<elb_http_method>[^ ]+) (?<access_timestamp>[^ ]+) (?<elb_name>[^ ]+) (?<client_ip>[^ ]+):(?<client_Port>[^ ]+) (?<target_ip_port>[^ ]+) (?<request_processing_time>[^ ]+) (?<target_processing_time>[^ ]+) (?<response_processing_time>[^ ]+) (?<elb_status_code>[^ ]+) (?<target_status_code>[^ ]+) (?<received_bytes>[^ ]+) (?<send_bytes>[^ ]+) (?<request>"[^"]+") (?<client_info>"[^"]+") (?<ssl_cipher>[^ ]+) (?<ssl_protocol>[^ ]+) (?<target_group_arn>[^ ]+) (?<trace_ip>[^ ]+) (?<domainname>[^ ]+) (?<chose_cert_arn>[^ ]+) (?<matched_rule_priority>[^ ]+) (?<elb_name>[^ ]+) (?<request_creation_time>[^ ]+)/
    </parse>
</filter>
<filter elb-www>
    @type parser
    key_name message
    reserve_data yes
    <parse>
        @type regexp
        expression  /(?<elb_http_method>[^ ]+) (?<access_timestamp>[^ ]+) (?<elb_name>[^ ]+) (?<client_ip>[^ ]+):(?<client_Port>[^ ]+) (?<target_ip_port>[^ ]+) (?<request_processing_time>[^ ]+) (?<target_processing_time>[^ ]+) (?<response_processing_time>[^ ]+) (?<elb_status_code>[^ ]+) (?<target_status_code>[^ ]+) (?<received_bytes>[^ ]+) (?<send_bytes>[^ ]+) (?<request>"[^"]+") (?<client_info>"[^"]+") (?<ssl_cipher>[^ ]+) (?<ssl_protocol>[^ ]+) (?<target_group_arn>[^ ]+) (?<trace_ip>[^ ]+) (?<domainname>[^ ]+) (?<chose_cert_arn>[^ ]+) (?<matched_rule_priority>[^ ]+) (?<elb_name>[^ ]+) (?<request_creation_time>[^ ]+)/
    </parse>
</filter>

这里是把处理过的日志数据输出到ES集群

<match elb-www>
    @type elasticsearch
    host elasticsearchlog-lb.elasticsearch-log
    index_name    fluentd-www-elb
    type_name     fluentd-www-elb
    flush_interval 2s                     #这里的刷新是把buffer的数据及时发送到ES,保持数据的实时性
    include_timestamp true         #这里需配置时间字段,便于kibana的时间跟踪
    ssl_verify    false
</match>
<match elb-com>
    @type elasticsearch
    host elasticsearchlog-lb.elasticsearch-log
    index_name    fluentd-elb
    type_name     fluentd-elb
    flush_interval 2s
    include_timestamp true
    ssl_verify    false
</match>
<match elb-inner.net>
    @type elasticsearch
    host elasticsearchlog-lb.elasticsearch-log
    index_name    fluentd-elb
    type_name     fluentd-elb
    flush_interval 2s
    include_timestamp true
    ssl_verify    false
</match>

以上是关于Fluentd 日志处理-S3拉取日志处理的主要内容,如果未能解决你的问题,请参考以下文章

fluentd日志处理-安装配置

Fluentd日志处理-插件使用和调试问题

是否可以使用 fluentd 的 redis_store 输出插件来处理大量日志?

ELK系列~对fluentd参数的理解

详解三款日志采集工具--Logstash,Fluentd, Logtail比较

centos7下安装docker(18.3docker日志---logging driver---fluentd)