ELK同步kafka带有key的Message
Posted firstsword
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK同步kafka带有key的Message相关的知识,希望对你有一定的参考价值。
需求
kafka中的message带有key,带有相同key值的message后入kafka的意味着更新message,message值为null则意味着删除message。
用logstash来同步kafka中这样的数据入Elasticsearch,从而实现可以同步增删改数据。
环境
1) logstash-6.5.4
2) kafka中topic的message带有key
解决(Logstash的配置如下)
input {
kafka {
bootstrap_servers=> "192.168.31.92:9092"
group_id => "test_group"
topics => ["test_topic"]
type => "test_type"
auto_offset_reset => earliest
consumer_threads => 1
decorate_events => true
codec => plain {
format => ""
}
}
}
filter{
if ([message] == "") {
mutate {
add_field => { "@esaction" => "delete"}
}
mutate {
remove_field => ["@version"]
}
} else {
json {
source => "message"
}
mutate {
remove_field => ["@version","message"]
}
mutate {
add_field => { "@esaction" => "index"}
}
date {
match => ["updated","UNIX_MS"]
target => "@timestamp"
}
}
}
output {
elasticsearch {
hosts => ["192.168.21.80:9200"]
index => "test_index"
document_id => "%{[@metadata][kafka][key]}"
action => "%{[@esaction]}"
codec => "json"
}
}
以上是关于ELK同步kafka带有key的Message的主要内容,如果未能解决你的问题,请参考以下文章
kafka-Message日志和索引文件消费组rebalance
spring boot 整合kafka 报错 Exception thrown when sending a message with key='null' and payload=J