logstash filter-split 示例

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash filter-split 示例相关的知识,希望对你有一定的参考价值。

参考技术A input
file
path => "/opt/logstash/config/aa.log" #配置读取的文件
start_position => "beginning" #从文件开始位置读取
discover_interval => 5 #设置logstash读取新文件的时间间隔
max_open_files => 10 #配置当前input可以监控的文件的最大值
close_older => 3600 #结束时间,即如果在限制时间段内没有更新内容,就关闭监听它的文件句

sincedb_path => "/data/sincedb_test.txt" #记录读取的位置
sincedb_write_interval => 15
codec => json #配置文本类型
charset => "UTF-8"




filter

if "M00002" in [message]
mutate
split => ["message", "|"] #原始日志按"|"切割



output
file
path => "/opt/logstash/config/bb.txt" #输出到一个文件内

stdoutcodec => rubydebug

logstash.conf示例

参考logstash的一个入门资料: http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html

输出ES时创建的索引模板定义:https://www.cnblogs.com/you-you-111/p/9844131.html

https://www.cnblogs.com/cangqinglang/p/12187801.html

input {
    file {
        path => "/Users/yiruan/dev/elk7/logstash-7.0.1/bin/movies.csv"
		start_position => "beginning"
		sincedb_path => "/dev/null"
    }
}
filter {
    csv {
        separator => ","
		columns => ["id", "content", "genre"]
    }
    mutate {
        split => {
            "genre" => "|"
        }
        remove_field => ["path", "host", "@timestamp", "message"]
    }
    mutate {
        split => ["content", "("]
		add_field => {
            "title" => "%{[content][0]}"
        }
        add_field => {
            "year" => "%{[content][1]}"
        }
    }
    mutate {
        convert => {
            "year" => "integer"
        }
        strip => ["title"]
		remove_field => ["path", "host", "@timestamp", "message", "content"]
    }
}
output {
    elasticsearch {
        hosts => "http://localhost:9200"
		index => "movies" 
		document_id => "%{id}"
    }
    stdout {}
}
#输入
input {
  	beats {
		port  => "5044"
		client_inactivity_timeout => 36000
 	}
}

#过滤
filter {
	grok {
		match => {
			"message" => [
				"%{TIMESTAMP_ISO8601:log_timestamp}.* %{LOGLEVEL:log_level}.* %{INT:time_consuming}.* %{USERNAME:time_consuming_unit}:.*client/%{IP:client_ip}:%{INT:client_port}.*%{IP:server_ip}:%{INT:server_port}.* -[ |\\r\\n]%{GREEDYDATA:cypher} - {} - .*",
				"%{TIMESTAMP_ISO8601:log_timestamp}.* %{LOGLEVEL:log_level}.* %{INT:time_consuming}.* %{USERNAME:time_consuming_unit}:.* -[ |\\r\\n]%{GREEDYDATA:cypher} - {} - .*"
			]
		}
		add_field => [ "received_at", "%{@timestamp}" ]
		add_field => [ "received_from", "%{host}" ]
		add_field => [ "day", "%{+YYYY.MM.dd}" ]
		remove_field => ["message","@timestamp"]
	}
	date {
		match => [ "log_timestamp", "YYYY-MMM-dd HH:mm:ss.SSS Z" ]
	}
}
#输出
output {
	elasticsearch {
        		hosts => "http://10.20.13.130:9200"
		index => "neo4j_querylog_%{day}"
    	}
	stdout {}
}
#输入
input {
  	beats {
		port  => "5044"
		client_inactivity_timeout => 36000
 	}
}

#过滤
filter {
	grok {
		match => {
			"message" => [
				"%{TIMESTAMP_ISO8601:log_timestamp}.* %{LOGLEVEL:log_level}.* %{INT:time_consuming}.* %{USERNAME:time_consuming_unit}:.*client/%{IP:client_ip}:%{INT:client_port}.*%{IP:server_ip}:%{INT:server_port}.* -[ |\\r\\n]%{GREEDYDATA:cypher} - {} - .*",
				"%{TIMESTAMP_ISO8601:log_timestamp}.* %{LOGLEVEL:log_level}.* %{INT:time_consuming}.* %{USERNAME:time_consuming_unit}:.* -[ |\\r\\n]%{GREEDYDATA:cypher} - {} - .*"
			]
		}
		add_field => [ "received_at", "%{@timestamp}" ]
		add_field => [ "received_from", "%{host}" ]
		add_field => [ "day", "%{+YYYY.MM.dd}" ]
		remove_field => ["message","@timestamp"]
	}
#	mutate {
#        		convert => ["time_consuming", "int"]
#   	}
	date {
		match => [ "log_timestamp", "YYYY-MMM-dd HH:mm:ss.SSS Z" ]
	}
}
#输出
output {
	elasticsearch {
        		hosts => "http://10.20.13.130:9200"
		index => "logstash_neo4j_querylog_%{day}"
		template => "/home/ubuntu/ongdbETL/logstash-7.5.1/bin/conf/logstash_neo4j_querylog.json"
            		template_name => "logstash_neo4j_querylog_*"
            		template_overwrite => true
    	}
	stdout {}
}
{
  "template": "logstash_neo4j_querylog_*",
  "order": 1,
  "settings": {
    "number_of_replicas": 1,
    "number_of_shards": 3,
    "refresh_interval": "1s",
    "translog": {
      "flush_threshold_size": "1.6gb"
    },
    "merge": {
      "scheduler": {
        "max_thread_count": "1"
      }
    },
    "index": {
      "routing": {
        "allocation": {
          "total_shards_per_node": "2"
        }
      }
    },
    "analysis": {
      "normalizer": {
        "my_normalizer": {
          "type": "custom",
          "filter": [
            "lowercase",
            "asciifolding"
          ]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "time_consuming": {
        "index": true,
        "store": true,
        "type": "integer"
      },
      "time_consuming_unit": {
        "index": true,
        "store": true,
        "type": "keyword"
      },
      "client_ip": {
        "index": true,
        "store": true,
        "type": "keyword"
      },
      "client_port": {
        "index": true,
        "store": true,
        "type": "keyword"
      },
      "server_ip": {
        "index": true,
        "store": true,
        "type": "keyword"
      },
      "server_port": {
        "index": true,
        "store": true,
        "type": "keyword"
      },
      "cypher": {
        "index": true,
        "store": true,
        "type": "keyword"
      },
      "received_from": {
        "index": true,
        "store": true,
        "type": "keyword"
      },
      "received_at": {
        "index": true,
        "store": true,
        "type": "keyword"
      },
      "log_level": {
        "index": true,
        "store": true,
        "type": "keyword"
      },
      "log_timestamp": {
        "index": true,
        "store": true,
        "type": "keyword"
      }
    }
  },
        "aliases": {
            "logstash_neo4j_querylog": {}
        }
}

 

以上是关于logstash filter-split 示例的主要内容,如果未能解决你的问题,请参考以下文章

kibana,logstashe,elasticsearch怎么读

logstash.conf示例

logstash.conf示例

Logstash: Grok 模式示例

Logstash: Grok 模式示例

Logstash处理数据用法示例---待完善