在 LogStash 中,如何删除任何大于特定大小的 json/xml 字段
Posted
技术标签:
【中文标题】在 LogStash 中,如何删除任何大于特定大小的 json/xml 字段【英文标题】:In LogStash, how remove any json/xml field larger than specific size 【发布时间】:2020-08-31 13:29:27 【问题描述】:简而言之,我在我们公司有这个堆栈用于我们的公司日志:
All Request/Response Log Files -> Filebeat -> Kafka -> Logstash - ElasiicSearch
很常见的方法。
尽管如此,在意外的请求/响应格式中可能存在一个非常大的 xml/json 字段。我只想删除这个特定的字段/节点,无论是 json 还是 xml 结构中的哪个级别,因为请求/响应可以是 SOAP (XML) 或 rest (json)。
换句话说,我以前不知道响应/请求消息树/结构,并且我不想根据整个大小丢弃整个消息,只丢弃大于特定大小的特定字段/节点。
例如:
2019-12-03 21:41:59.409 INFO 4055 --- [ntainer#0-0-C-1] Transaction Consumer : Message received successfully: "serviceId":"insertEft_TransferPropias","sourceTransaction":"CMMO","xml":"PD94bWw some very large base 64 data ...
我的整个 docker compose 是:
version: '3.2'
services:
zoo1:
image: elevy/zookeeper:latest
environment:
MYID: 1
SERVERS: zoo1
ports:
- "2181:2181"
kafka1:
image: wurstmeister/kafka
command: [start-kafka.sh]
depends_on:
- zoo1
links:
- zoo1
ports:
- "9092:9092"
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: "168"
KAFKA_LOG_RETENTION_BYTES: "100000000"
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181
KAFKA_CREATE_TOPICS: "log:1:1"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
filebeat:
image: docker.elastic.co/beats/filebeat:7.5.2
command: filebeat -e -strict.perms=false
volumes:
- "//c/Users/Cast/docker_folders/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro"
- "//c/Users/Cast/docker_folders/sample-logs:/sample-logs"
links:
- kafka1
depends_on:
- kafka1
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
- xpack.watcher.enabled=false
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- "//c/Users/Cast/docker_folders/esdata:/usr/share/elasticsearch/data"
ports:
- "9200:9200"
kibana:
image: docker.elastic.co/kibana/kibana:7.5.2
volumes:
- "//c/Users/Cast/docker_folders/kibana.yml:/usr/share/kibana/config/kibana.yml"
restart: always
environment:
- SERVER_NAME=kibana.localhost
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
links:
- elasticsearch
depends_on:
- elasticsearch
logstash:
image: docker.elastic.co/logstash/logstash:7.5.2
volumes:
- "//c/Users/Cast/docker_folders/logstash.conf:/config-dir/logstash.conf"
restart: always
command: logstash -f /config-dir/logstash.conf
ports:
- "9600:9600"
- "7777:7777"
links:
- elasticsearch
- kafka1
logstash.conf
input
kafka
codec => "json"
bootstrap_servers => "kafka1:9092"
topics => ["app_logs","request_logs"]
tags => ["my-app"]
filter
if [fields][topic_name] == "app_logs"
grok
match => "message" => "%TIMESTAMP_ISO8601:timestamp *%LOGLEVEL:level %DATA:pid --- *\[%DATA:application] *%DATA:class : %GREEDYDATA:msglog"
tag_on_failure => ["not_date_line"]
date
match => ["timestamp", "ISO8601"]
target => "timestamp"
if "_grokparsefailure" in [tags]
mutate
add_field => "level" => "UNKNOWN"
output
elasticsearch
hosts => ["elasticsearch:9200"]
index => "%[fields][topic_name]-%+YYYY.MM.dd"
设想的解决方案
...
grok
match => "message" => "%TIMESTAMP_ISO8601:timestamp *%LOGLEVEL:level %DATA:pid --- *\[%DATA:application] *%DATA:class : %GREEDYDATA:msglog"
tag_on_failure => ["not_date_line"]
...
if "_grokparsefailure" in [tags]
filter
mutate remove_field => [ "field1", "field2", "field3", ... "fieldN" dinamically discovered based on size ]
***已编辑
我不确定这种方法有多好,主要是因为在我看来,我将强制 Logstash 表现为一个块阶段,将所有 json 传入内存并在保存到 Elastic 之前对其进行解析。顺便说一句,尚未在压力情景下进行测试,我的一位同事提出了这种替代方案
input...
filter
if "JAVALOG" in [tags]
grok
match => "message" => "%TIMESTAMP_ISO8601:timestamp %WORD:severity (?<thread>\[.*]) (?<obj>.*)"
json
source => "obj"
target => "data"
skip_on_invalid_json => true
json
source => "[data][entity]"
target => "request"
skip_on_invalid_json => true
mutate remove_field => [ "message" ]
mutate remove_field => [ "obj" ]
mutate lowercase => [ "[tags][0]" ]
mutate lowercase => [ "meta_path" ]
ruby
code => '
request_msg = JSON.parse(event.get("[data][entity]"))
request_msg.to_hash.each do |key, value|
logger.info("field is: #key")
if value.to_s.length > 10
logger.info("field length is greater than 10!")
request_msg.delete("#key")
event.set("[data][entity]", request_msg.to_s)
end
end
'
mutate remove_field => ["request"]
json
source => "data"
target => "data_1"
skip_on_invalid_json => true
output ...
【问题讨论】:
您对 size 的确切含义是什么;字段的字符长度或字节大小? @apt-get_install_skill 字节大小 我期待在这里听到任何线索或建议 【参考方案1】:您是否考虑过使用 logstash 模板上可用的设置?
下面是一个例子:
PUT my_index
"mappings":
"properties":
"message":
"type": "keyword",
"ignore_above": 20
来源:https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-above.html
【讨论】:
非常感谢。您给了我一个新想法:我可以在保存到 Elastic index 之前将其切断,而不是在 LogStash 过滤器中切断,这与结果完全相同。我还添加了我的一位同事提出的另一种解决方法,但您的想法当然是更好的方法。我在最初的问题中添加了旨在帮助未来读者的问题以上是关于在 LogStash 中,如何删除任何大于特定大小的 json/xml 字段的主要内容,如果未能解决你的问题,请参考以下文章