在 LogStash 中,如何删除任何大于特定大小的 json/xml 字段

Posted

技术标签:

【中文标题】在 LogStash 中,如何删除任何大于特定大小的 json/xml 字段【英文标题】:In LogStash, how remove any json/xml field larger than specific size 【发布时间】:2020-08-31 13:29:27 【问题描述】:

简而言之,我在我们公司有这个堆栈用于我们的公司日志:

All Request/Response Log Files -> Filebeat -> Kafka -> Logstash - ElasiicSearch

很常见的方法。

尽管如此,在意外的请求/响应格式中可能存在一个非常大的 xml/json 字段。我只想删除这个特定的字段/节点,无论是 json 还是 xml 结构中的哪个级别,因为请求/响应可以是 SOAP (XML) 或 rest (json)。

换句话说,我以前不知道响应/请求消息树/结构,并且我不想根据整个大小丢弃整个消息,只丢弃大于特定大小的特定字段/节点。

例如:

2019-12-03 21:41:59.409  INFO 4055 --- [ntainer#0-0-C-1] Transaction Consumer                     : Message received successfully: "serviceId":"insertEft_TransferPropias","sourceTransaction":"CMMO","xml":"PD94bWw some very large base 64 data ...

我的整个 docker compose 是:

version: '3.2'
services:

  zoo1:
    image: elevy/zookeeper:latest
    environment:
      MYID: 1
      SERVERS: zoo1
    ports:
      - "2181:2181"

  kafka1:
    image: wurstmeister/kafka
    command: [start-kafka.sh]
    depends_on:
      - zoo1
    links:
      - zoo1
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: "168"
      KAFKA_LOG_RETENTION_BYTES: "100000000"
      KAFKA_ZOOKEEPER_CONNECT:  zoo1:2181
      KAFKA_CREATE_TOPICS: "log:1:1"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

  filebeat:
    image: docker.elastic.co/beats/filebeat:7.5.2
    command: filebeat -e -strict.perms=false
    volumes:
      - "//c/Users/Cast/docker_folders/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro"
      - "//c/Users/Cast/docker_folders/sample-logs:/sample-logs"
    links:
      - kafka1
    depends_on:
      - kafka1

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - xpack.security.enabled=false
      - xpack.watcher.enabled=false
      - discovery.type=single-node
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - "//c/Users/Cast/docker_folders/esdata:/usr/share/elasticsearch/data"
    ports:
      - "9200:9200"

  kibana:
    image: docker.elastic.co/kibana/kibana:7.5.2
    volumes:
      - "//c/Users/Cast/docker_folders/kibana.yml:/usr/share/kibana/config/kibana.yml"
    restart: always
    environment:
    - SERVER_NAME=kibana.localhost
    - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    links:
      - elasticsearch
    depends_on:
      - elasticsearch

  logstash:
    image: docker.elastic.co/logstash/logstash:7.5.2
    volumes:
      - "//c/Users/Cast/docker_folders/logstash.conf:/config-dir/logstash.conf"
    restart: always
    command: logstash -f /config-dir/logstash.conf
    ports:
      - "9600:9600"
      - "7777:7777"
    links:
      - elasticsearch
      - kafka1

logstash.conf

input
  kafka
    codec => "json"
    bootstrap_servers => "kafka1:9092"
    topics => ["app_logs","request_logs"]
    tags => ["my-app"]
  


filter     
    if [fields][topic_name] == "app_logs"      
        grok 
            match =>  "message" => "%TIMESTAMP_ISO8601:timestamp *%LOGLEVEL:level %DATA:pid --- *\[%DATA:application] *%DATA:class : %GREEDYDATA:msglog" 
            tag_on_failure => ["not_date_line"]
                   
        date 
            match => ["timestamp", "ISO8601"]
            target => "timestamp"
           
        if "_grokparsefailure" in [tags] 
            mutate 
                add_field =>  "level" => "UNKNOWN" 
            
               
     


output 
  elasticsearch 
    hosts => ["elasticsearch:9200"]
    index => "%[fields][topic_name]-%+YYYY.MM.dd"
  

设想的解决方案

...
        grok 
            match =>  "message" => "%TIMESTAMP_ISO8601:timestamp *%LOGLEVEL:level %DATA:pid --- *\[%DATA:application] *%DATA:class : %GREEDYDATA:msglog" 
            tag_on_failure => ["not_date_line"]
        
...
        if "_grokparsefailure" in [tags] 
            filter 
              mutate  remove_field => [ "field1", "field2", "field3", ... "fieldN" dinamically discovered based on size ] 
            
        

***已编辑

我不确定这种方法有多好,主要是因为在我看来,我将强制 Logstash 表现为一个块阶段,将所有 json 传入内存并在保存到 Elastic 之前对其进行解析。顺便说一句,尚未在压力情景下进行测试,我的一位同事提出了这种替代方案

input...
filter 
 if "JAVALOG" in [tags]    
    grok 
            match =>  "message" => "%TIMESTAMP_ISO8601:timestamp %WORD:severity (?<thread>\[.*]) (?<obj>.*)" 
        

        json 
            source => "obj"
            target => "data"
            skip_on_invalid_json => true
        
    json 
        source => "[data][entity]"
            target => "request"
            skip_on_invalid_json => true
    
        mutate remove_field => [ "message" ]
mutate remove_field => [ "obj" ]
    mutate  lowercase => [ "[tags][0]" ]  
    mutate  lowercase => [ "meta_path" ]  
    ruby 
        code => '
        request_msg = JSON.parse(event.get("[data][entity]"))
                request_msg.to_hash.each do |key, value|        
            logger.info("field is: #key")
                        if value.to_s.length > 10
                                logger.info("field length is greater than 10!")
                request_msg.delete("#key")
                event.set("[data][entity]", request_msg.to_s)
                        end
                end
                '
    
    mutate  remove_field => ["request"] 
json 
            source => "data"
            target => "data_1"
            skip_on_invalid_json => true
        


output ...

【问题讨论】:

您对 size 的确切含义是什么;字段的字符长度或字节大小? @apt-get_install_skill 字节大小 我期待在这里听到任何线索或建议 【参考方案1】:

您是否考虑过使用 logstash 模板上可用的设置?

下面是一个例子:

PUT my_index

  "mappings": 
    "properties": 
      "message": 
        "type": "keyword",
        "ignore_above": 20 
      
    
  

来源:https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-above.html

【讨论】:

非常感谢。您给了我一个新想法:我可以在保存到 Elastic index 之前将其切断,而不是在 LogStash 过滤器中切断,这与结果完全相同。我还添加了我的一位同事提出的另一种解决方法,但您的想法当然是更好的方法。我在最初的问题中添加了旨在帮助未来读者的问题

以上是关于在 LogStash 中,如何删除任何大于特定大小的 json/xml 字段的主要内容,如果未能解决你的问题,请参考以下文章

如何保存具有特定文件大小的图像?

丢弃过滤器不工作logstash

批处理文件返回文件夹大小并删除内容(如果大于50gb)

列出当前目录和所有子目录中特定大小的文件

如何检查传入号码数据是否大于或小于Firestore安全规则中的特定限制?

删除数组中满足特定需求的数字