tomcat 格式化输出到kafka

Posted fengjian1585

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了tomcat 格式化输出到kafka相关的知识,希望对你有一定的参考价值。

 

cat /data/tomcat/conf/server.xml

<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
prefix="localhost_access_log" suffix=".txt"
pattern="%h %l %u %t &quot;%r&quot; %s" />
<Context docBase="/data/webserver/" path="/" reloadable="false" />

输出格式如下:

 172.16.200.16 - - [21/Oct/2016:16:55:03 +0800] "GET /static/My97DatePicker/skin/WdatePicker.css HTTP/1.1" 304

 

 

input {
  file {
        path => "/root/localhost_access_log.2016-10-21.txt"
        start_position => "beginning"
        type => "tomcat_access"
        }
  }

filter {
    if [type] == "tomcat_access" {
        grok{
           match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request}(?: HTTP/%{NUMBER:httpversion})?|-)\" %{NUMBER:response}"}
        }
   }
}

output {
    if [type] == "tomcat_access" {
      kafka {
          bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
          topic_id => "tomcat_access.log"
          compression_type => "snappy"
       }
    }
}
~       

 

logstash 服务器端

input {
    if [type] == "haproxy_http" {
        kafka {
        zk_connect => "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
              topic_id => "haproxy_http.log"
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true
            }
  } else if [type] == "haproxy_tcp" {
            kafka {
        zk_connect => "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
              topic_id => "haproxy_tcp.log"
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true
        }
 } else if [type] == "tomcat_access" {
                kafka {
                zk_connect => "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
                topic_id => "tomcat_access.log"
                reset_beginning => false
                consumer_threads => 5
                decorate_events => true
        }
    }
}


output {
        if [type] == "haproxy_http" {
            elasticsearch {
             hosts => ["es1:9200","es2:9200","es3:9200"]
         manage_template => true
             index => "logstash-haproxy-http.log-%{+YYYY-MM-dd}"
        }
    }
      if [type] == "haproxy_tcp" {
            elasticsearch {
        hosts => ["es1:9200","es2:9200","es3:9200"]
                manage_template => true
                index => "logstash-haproxy-tcp.log-%{+YYYY-MM-dd}"
        }
    }
        if [type] == "tomcat_access" {
                elasticsearch {
                hosts => ["es1:9200","es2:9200","es3:9200"]
                manage_template => true
                index => "logstash-tomcat_http.log-%{+YYYY-MM-dd}"
                }
        }

}

 




以上是关于tomcat 格式化输出到kafka的主要内容,如果未能解决你的问题,请参考以下文章

kafka利用log4j输出日志到哪里

log4j2.xml中的kafka appender不起作用

uvec2片段着色器输出的哪个组合

使用flume抓取tomcat的日志文件下沉到kafka消费

将多个输出中的hls属性设置为单独的片段代码

在Tomcat的安装目录下conf目录下的server.xml文件中增加一个xml代码片段,该代码片段中每个属性的含义与用途