从 kafka-filebeat 创建动态索引

Posted

技术标签:

【中文标题】从 kafka-filebeat 创建动态索引【英文标题】:creating dynamic index from kafka-filebeat 【发布时间】:2022-01-18 07:10:30 【问题描述】:

软件版本:ES-OSS-7.4.2,filebeat-OSS-7.4.2

以下是我的 filebeat.yml 和 grok 管道

filebeat.inputs:
- type: kafka
  hosts:
    - test-bigdata-kafka0003:9092
    - test-bigdata-kafka0002:9092
    - test-bigdata-kafka0001:9092
  topics: ["bigdata-k8s-test-serverlog"]
  group_id: "filebeat-kafka-test"
setup.template.settings:
  index.number_of_shards: 1
  _source.enabled: true
setup.template.name: "test"
setup.template.pattern: "test-*"
setup.template.overwrite: true
setup.template.enabled: true
setup.ilm.enable: true
setup.ilm.rollover_alias: "test"
setup.kibana:
  host: "https://xxx:8080"
  username: "superuser"
  password: "123456"
  ssl.verification_mode: none
output.elasticsearch:
  index: "test-%[jiserver]-%+yyyy.MM.dd"
  pipeline: "test-pipeline"
  hosts: ["xxx:8200"]
  username: "superuser"
  password: "123456"
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

管道.json


    "description": "Test pipeline",
    "processors": [
        
            "grok": 
                "field": "message",
                "patterns": ["%CUSTOMTIME:timestamp (?:%NOTSPACE:jiserver|-) (?:%NOTSPACE:hostname|-) (?:%LOGLEVEL:level|-) (?:%NOTSPACE:thread|-) (?:%NOTSPACE:class|-) (?:%NOTSPACE:method|-) (?:%NOTSPACE:line|-) (?:%CUSTOMDATA:message|-)"],
                "pattern_definitions": 
                    "CUSTOMTIME": "%YEAR[- ]%MONTHNUM[- ]%MONTHDAY[- ]%TIME",
                    "CUSTOMDATA": "((%GREEDYDATA)[[:space:]]?)+"
                
            
        
    ],
    "on_failure": [
        
          "set": 
            "field": "error_information",
            "value": "Processor  _ingest.on_failure_processor_type  with tag  _ingest.on_failure_processor_tag  in pipeline  _ingest.on_failure_pipeline  failed with message  _ingest.on_failure_message "
          
        
    ]

我使用 grok 将消息拆分到不同的字段,其中之一是 jiserver 。我想用jiserver动态命名我的索引,怎么办。以上设置无效,接收错误

[elasticsearch] elasticsearch/client.go:541 Bulk item insert failed (i=0, status=500): "type":"string_index_out_of_bounds_exception","reason":"String index out of range: 0"

【问题讨论】:

【参考方案1】:

我找到了解决办法。filebeat.yml 添加脚本处理器

processors:
  - script:
      lang: javascript
      id: my_filter
      source: >
        function process(event) 
            var message = event.Get("message");
            var name = message.split(" ")
            event.Put("jiserver", name[2])
        

【讨论】:

以上是关于从 kafka-filebeat 创建动态索引的主要内容,如果未能解决你的问题,请参考以下文章

如何在ExtJS Tabpanel中显示JSON表单字段?

es 创建动态索引(二)

ES 索引模板和动态模板

es 创建动态索引(一)

如何创建从数据库中动态提取的静态 WordPress 页面?

如何知道动态创建的 TabItem 的索引