logstash filter-split 示例
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash filter-split 示例相关的知识,希望对你有一定的参考价值。
参考技术A inputfile
path => "/opt/logstash/config/aa.log" #配置读取的文件
start_position => "beginning" #从文件开始位置读取
discover_interval => 5 #设置logstash读取新文件的时间间隔
max_open_files => 10 #配置当前input可以监控的文件的最大值
close_older => 3600 #结束时间,即如果在限制时间段内没有更新内容,就关闭监听它的文件句
柄
sincedb_path => "/data/sincedb_test.txt" #记录读取的位置
sincedb_write_interval => 15
codec => json #配置文本类型
charset => "UTF-8"
filter
if "M00002" in [message]
mutate
split => ["message", "|"] #原始日志按"|"切割
output
file
path => "/opt/logstash/config/bb.txt" #输出到一个文件内
stdoutcodec => rubydebug
logstash.conf示例
参考logstash的一个入门资料: http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html
输出ES时创建的索引模板定义:https://www.cnblogs.com/you-you-111/p/9844131.html
https://www.cnblogs.com/cangqinglang/p/12187801.html
input {
file {
path => "/Users/yiruan/dev/elk7/logstash-7.0.1/bin/movies.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["id", "content", "genre"]
}
mutate {
split => {
"genre" => "|"
}
remove_field => ["path", "host", "@timestamp", "message"]
}
mutate {
split => ["content", "("]
add_field => {
"title" => "%{[content][0]}"
}
add_field => {
"year" => "%{[content][1]}"
}
}
mutate {
convert => {
"year" => "integer"
}
strip => ["title"]
remove_field => ["path", "host", "@timestamp", "message", "content"]
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "movies"
document_id => "%{id}"
}
stdout {}
}
#输入
input {
beats {
port => "5044"
client_inactivity_timeout => 36000
}
}
#过滤
filter {
grok {
match => {
"message" => [
"%{TIMESTAMP_ISO8601:log_timestamp}.* %{LOGLEVEL:log_level}.* %{INT:time_consuming}.* %{USERNAME:time_consuming_unit}:.*client/%{IP:client_ip}:%{INT:client_port}.*%{IP:server_ip}:%{INT:server_port}.* -[ |\\r\\n]%{GREEDYDATA:cypher} - {} - .*",
"%{TIMESTAMP_ISO8601:log_timestamp}.* %{LOGLEVEL:log_level}.* %{INT:time_consuming}.* %{USERNAME:time_consuming_unit}:.* -[ |\\r\\n]%{GREEDYDATA:cypher} - {} - .*"
]
}
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
add_field => [ "day", "%{+YYYY.MM.dd}" ]
remove_field => ["message","@timestamp"]
}
date {
match => [ "log_timestamp", "YYYY-MMM-dd HH:mm:ss.SSS Z" ]
}
}
#输出
output {
elasticsearch {
hosts => "http://10.20.13.130:9200"
index => "neo4j_querylog_%{day}"
}
stdout {}
}
#输入
input {
beats {
port => "5044"
client_inactivity_timeout => 36000
}
}
#过滤
filter {
grok {
match => {
"message" => [
"%{TIMESTAMP_ISO8601:log_timestamp}.* %{LOGLEVEL:log_level}.* %{INT:time_consuming}.* %{USERNAME:time_consuming_unit}:.*client/%{IP:client_ip}:%{INT:client_port}.*%{IP:server_ip}:%{INT:server_port}.* -[ |\\r\\n]%{GREEDYDATA:cypher} - {} - .*",
"%{TIMESTAMP_ISO8601:log_timestamp}.* %{LOGLEVEL:log_level}.* %{INT:time_consuming}.* %{USERNAME:time_consuming_unit}:.* -[ |\\r\\n]%{GREEDYDATA:cypher} - {} - .*"
]
}
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
add_field => [ "day", "%{+YYYY.MM.dd}" ]
remove_field => ["message","@timestamp"]
}
# mutate {
# convert => ["time_consuming", "int"]
# }
date {
match => [ "log_timestamp", "YYYY-MMM-dd HH:mm:ss.SSS Z" ]
}
}
#输出
output {
elasticsearch {
hosts => "http://10.20.13.130:9200"
index => "logstash_neo4j_querylog_%{day}"
template => "/home/ubuntu/ongdbETL/logstash-7.5.1/bin/conf/logstash_neo4j_querylog.json"
template_name => "logstash_neo4j_querylog_*"
template_overwrite => true
}
stdout {}
}
{
"template": "logstash_neo4j_querylog_*",
"order": 1,
"settings": {
"number_of_replicas": 1,
"number_of_shards": 3,
"refresh_interval": "1s",
"translog": {
"flush_threshold_size": "1.6gb"
},
"merge": {
"scheduler": {
"max_thread_count": "1"
}
},
"index": {
"routing": {
"allocation": {
"total_shards_per_node": "2"
}
}
},
"analysis": {
"normalizer": {
"my_normalizer": {
"type": "custom",
"filter": [
"lowercase",
"asciifolding"
]
}
}
}
},
"mappings": {
"properties": {
"time_consuming": {
"index": true,
"store": true,
"type": "integer"
},
"time_consuming_unit": {
"index": true,
"store": true,
"type": "keyword"
},
"client_ip": {
"index": true,
"store": true,
"type": "keyword"
},
"client_port": {
"index": true,
"store": true,
"type": "keyword"
},
"server_ip": {
"index": true,
"store": true,
"type": "keyword"
},
"server_port": {
"index": true,
"store": true,
"type": "keyword"
},
"cypher": {
"index": true,
"store": true,
"type": "keyword"
},
"received_from": {
"index": true,
"store": true,
"type": "keyword"
},
"received_at": {
"index": true,
"store": true,
"type": "keyword"
},
"log_level": {
"index": true,
"store": true,
"type": "keyword"
},
"log_timestamp": {
"index": true,
"store": true,
"type": "keyword"
}
}
},
"aliases": {
"logstash_neo4j_querylog": {}
}
}
以上是关于logstash filter-split 示例的主要内容,如果未能解决你的问题,请参考以下文章