我已经用logstash输入和elasticsearch输出实现了kafka。它在kibana中工作正常..我想根据状态码过滤数据
Posted
技术标签:
【中文标题】我已经用logstash输入和elasticsearch输出实现了kafka。它在kibana中工作正常..我想根据状态码过滤数据【英文标题】:I have implemented the kafka with logstash input and elasticsearch output. its working fine in kibana.. I want to filter the data based on statuscode 【发布时间】:2022-01-17 16:54:54 【问题描述】:这是 kibana 仪表板 json 数据。这里我必须在消息 json 数据字段中过滤基于响应状态码。
"_index": "rand-topic",
"_type": "_doc",
"_id": "ulF8uH0BK9MbBSR7DPEw",
"_version": 1,
"_score": null,
"fields":
"@timestamp": [
"2021-12-14T10:27:56.956Z"
],
"@version": [
"1"
],
"@version.keyword": [
"1"
],
"message": [
"\"requestMethod\":\"GET\",\"headers\":\"content-type\":\"application/json\",\"user-agent\":\"PostmanRuntime/7.28.4\",\"accept\":\"*/*\",\"postman-token\":\"977fc94b-38c8-4df4-ad73-814871a32eca\",\"host\":\"localhost:5600\",\"accept-encoding\":\"gzip, deflate, br\",\"connection\":\"keep-alive\",\"content-length\":\"44\",\"body\":\"category\":\"CAT\",\"noise\":\"purr\",\"query\":,\"requestUrl\":\"http://localhost:5600/kafka\",\"protocol\":\"HTTP/1.1\",\"remoteIp\":\"1\",\"requestSize\":302,\"userAgent\":\"PostmanRuntime/7.28.4\",\"statusCode\":200,\"response\":\"success\":true,\"message\":\"Kafka Details are added\",\"data\":\"kafkaData\":\"_id\":\"61b871ac69be37078a9c1a79\",\"category\":\"DOG\",\"noise\":\"bark\",\"__v\":0,\"postData\":\"category\":\"DOG\",\"noise\":\"bark\",\"latency\":\"seconds\":0,\"nanos\":61000000,\"responseSize\":193"],"sort[1639477676956]
这样的预期输出这里添加了来自消息字段的状态码字段
"_index": "rand-topic",
"_type": "_doc",
"_id": "ulF8uH0BK9MbBSR7DPEw",
"_version": 1,
"_score": null,
"fields":
"@timestamp": [
"2021-12-14T10:27:56.956Z"
],
"@version": [
"1"
],
"@version.keyword": [
"1"
],
"statusCode": [
200
],
"message": [
"\"requestMethod\":\"GET\",\"headers\":\"content-
type\":\"application/json\",\"user-
agent\":\"PostmanRuntime/7.28.4\",\"accept\":\"*/*\",\"postman-
token\":\"977fc94b-38c8-4df4-ad73-
814871a32eca\",\"host\":\"localhost:5600\",\"accept-
encoding\":\"gzip, deflate, br\",\"connection\":\"keep-
alive\",\"content-length\":\"44\",\"body\":
\"category\":\"CAT\",\"noise\":\"purr\",\"query\": , \"requestUrl\":\"http://localhost:5600/kafka\",\"protocol\":\"HTTP/1.1\",\"remoteIp\":\"1\",\"requestSize\":302,\"userAgent\":\"PostmanRuntime/7.28.4\",\"statusCode\":200,\"response\":\"success\":true,\"message\":\"Kafka Details are added\",\"data\":\"kafkaData\":\"_id\":\"61b871ac69be37078a9c1a79\",\"category\":\"DOG\",\"noise\":\"bark\",\"__v\":0,\"postData\":\"category\":\"DOG\",\"noise\":\"bark\",\"latency\":\"seconds\":0,\"nanos\":61000000,\"responseSize\":193"
],"排序": [1639477676956]
请帮助我如何为 statusCode 配置 logstash 过滤器
input
kafka
topics => ["randtopic"]
bootstrap_servers => "192.168.29.138:9092"
filter
mutate
add_field =>
"statusCode" => "%[status]"
output
elasticsearch
hosts => ["192.168.29.138:9200"]
index => "rand-topic"
workers => 1
【问题讨论】:
【参考方案1】: output
if [message][0][statusCode] == "200"
Do Somethings ....
stdout codec => ""
【讨论】:
我需要使用消息字段 json (\"statusCode\":200,) 根据此响应中的状态码进行过滤,并将其设置在日志存储的过滤器配置中以将其显示为单独的字段和让消息字段就像一个字符串而不是 kibana 中的 json 现在我已经使用它了。但不会起作用 mutate add_field => "statusCode" => "%[status]" 试试这个 :mutate add_field => "StatusCode" => "%[message][statusCode]" 我尝试了 2 种方式.. 但它不起作用。变异 add_field => "statusCode" => "%[message][0][statusCode]" 。和 mutate add_field => "statusCode" => "%[message][statusCode]" 输出 statusCode : %[message][0][status] 这里的状态码值没有设置那个变量。它就像返回给定的字符串以上是关于我已经用logstash输入和elasticsearch输出实现了kafka。它在kibana中工作正常..我想根据状态码过滤数据的主要内容,如果未能解决你的问题,请参考以下文章
Centos7安装elasticsearchlogstashkibanaelasticsearch head