从 kafka-filebeat 创建动态索引
Posted
技术标签:
【中文标题】从 kafka-filebeat 创建动态索引【英文标题】:creating dynamic index from kafka-filebeat 【发布时间】:2022-01-18 07:10:30 【问题描述】:软件版本:ES-OSS-7.4.2,filebeat-OSS-7.4.2
以下是我的 filebeat.yml 和 grok 管道
filebeat.inputs:
- type: kafka
hosts:
- test-bigdata-kafka0003:9092
- test-bigdata-kafka0002:9092
- test-bigdata-kafka0001:9092
topics: ["bigdata-k8s-test-serverlog"]
group_id: "filebeat-kafka-test"
setup.template.settings:
index.number_of_shards: 1
_source.enabled: true
setup.template.name: "test"
setup.template.pattern: "test-*"
setup.template.overwrite: true
setup.template.enabled: true
setup.ilm.enable: true
setup.ilm.rollover_alias: "test"
setup.kibana:
host: "https://xxx:8080"
username: "superuser"
password: "123456"
ssl.verification_mode: none
output.elasticsearch:
index: "test-%[jiserver]-%+yyyy.MM.dd"
pipeline: "test-pipeline"
hosts: ["xxx:8200"]
username: "superuser"
password: "123456"
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
管道.json
"description": "Test pipeline",
"processors": [
"grok":
"field": "message",
"patterns": ["%CUSTOMTIME:timestamp (?:%NOTSPACE:jiserver|-) (?:%NOTSPACE:hostname|-) (?:%LOGLEVEL:level|-) (?:%NOTSPACE:thread|-) (?:%NOTSPACE:class|-) (?:%NOTSPACE:method|-) (?:%NOTSPACE:line|-) (?:%CUSTOMDATA:message|-)"],
"pattern_definitions":
"CUSTOMTIME": "%YEAR[- ]%MONTHNUM[- ]%MONTHDAY[- ]%TIME",
"CUSTOMDATA": "((%GREEDYDATA)[[:space:]]?)+"
],
"on_failure": [
"set":
"field": "error_information",
"value": "Processor _ingest.on_failure_processor_type with tag _ingest.on_failure_processor_tag in pipeline _ingest.on_failure_pipeline failed with message _ingest.on_failure_message "
]
我使用 grok 将消息拆分到不同的字段,其中之一是 jiserver
。我想用jiserver动态命名我的索引,怎么办。以上设置无效,接收错误
[elasticsearch] elasticsearch/client.go:541 Bulk item insert failed (i=0, status=500): "type":"string_index_out_of_bounds_exception","reason":"String index out of range: 0"
【问题讨论】:
【参考方案1】:我找到了解决办法。filebeat.yml 添加脚本处理器
processors:
- script:
lang: javascript
id: my_filter
source: >
function process(event)
var message = event.Get("message");
var name = message.split(" ")
event.Put("jiserver", name[2])
【讨论】:
以上是关于从 kafka-filebeat 创建动态索引的主要内容,如果未能解决你的问题,请参考以下文章