我已经用logstash输入和elasticsearch输出实现了kafka。它在kibana中工作正常..我想根据状态码过滤数据

Posted

技术标签:

【中文标题】我已经用logstash输入和elasticsearch输出实现了kafka。它在kibana中工作正常..我想根据状态码过滤数据【英文标题】:I have implemented the kafka with logstash input and elasticsearch output. its working fine in kibana.. I want to filter the data based on statuscode 【发布时间】:2022-01-17 16:54:54 【问题描述】:

这是 kibana 仪表板 json 数据。这里我必须在消息 json 数据字段中过滤基于响应状态码。


  "_index": "rand-topic",
  "_type": "_doc",
  "_id": "ulF8uH0BK9MbBSR7DPEw",
  "_version": 1,
  "_score": null,
  "fields": 
    "@timestamp": [
    "2021-12-14T10:27:56.956Z"
],
"@version": [
  "1"
],
"@version.keyword": [
  "1"
],
"message": [
  "\"requestMethod\":\"GET\",\"headers\":\"content-type\":\"application/json\",\"user-agent\":\"PostmanRuntime/7.28.4\",\"accept\":\"*/*\",\"postman-token\":\"977fc94b-38c8-4df4-ad73-814871a32eca\",\"host\":\"localhost:5600\",\"accept-encoding\":\"gzip, deflate, br\",\"connection\":\"keep-alive\",\"content-length\":\"44\",\"body\":\"category\":\"CAT\",\"noise\":\"purr\",\"query\":,\"requestUrl\":\"http://localhost:5600/kafka\",\"protocol\":\"HTTP/1.1\",\"remoteIp\":\"1\",\"requestSize\":302,\"userAgent\":\"PostmanRuntime/7.28.4\",\"statusCode\":200,\"response\":\"success\":true,\"message\":\"Kafka Details are added\",\"data\":\"kafkaData\":\"_id\":\"61b871ac69be37078a9c1a79\",\"category\":\"DOG\",\"noise\":\"bark\",\"__v\":0,\"postData\":\"category\":\"DOG\",\"noise\":\"bark\",\"latency\":\"seconds\":0,\"nanos\":61000000,\"responseSize\":193"],"sort[1639477676956]

这样的预期输出这里添加了来自消息字段的状态码字段


  "_index": "rand-topic",
  "_type": "_doc",
  "_id": "ulF8uH0BK9MbBSR7DPEw",
  "_version": 1,
  "_score": null,
 "fields": 
   "@timestamp": [
    "2021-12-14T10:27:56.956Z"
   ],
 "@version": [
    "1"
 ],
 "@version.keyword": [
    "1"
 ],
"statusCode": [
  200
],
"message": [
   "\"requestMethod\":\"GET\",\"headers\":\"content- 
 type\":\"application/json\",\"user- 
 agent\":\"PostmanRuntime/7.28.4\",\"accept\":\"*/*\",\"postman- 
 token\":\"977fc94b-38c8-4df4-ad73- 
 814871a32eca\",\"host\":\"localhost:5600\",\"accept- 
 encoding\":\"gzip, deflate, br\",\"connection\":\"keep- 
 alive\",\"content-length\":\"44\",\"body\": 
 \"category\":\"CAT\",\"noise\":\"purr\",\"query\": , \"requestUrl\":\"http://localhost:5600/kafka\",\"protocol\":\"HTTP/1.1\",\"remoteIp\":\"1\",\"requestSize\":302,\"userAgent\":\"PostmanRuntime/7.28.4\",\"statusCode\":200,\"response\":\"success\":true,\"message\":\"Kafka Details are added\",\"data\":\"kafkaData\":\"_id\":\"61b871ac69be37078a9c1a79\",\"category\":\"DOG\",\"noise\":\"bark\",\"__v\":0,\"postData\":\"category\":\"DOG\",\"noise\":\"bark\",\"latency\":\"seconds\":0,\"nanos\":61000000,\"responseSize\":193"

],"排序": [1639477676956]

请帮助我如何为 statusCode 配置 logstash 过滤器

input 
  kafka 
    topics => ["randtopic"]
    bootstrap_servers => "192.168.29.138:9092"
  


filter
  mutate 
    add_field =>  
        "statusCode" => "%[status]" 
    
  


output 
 elasticsearch 
    hosts => ["192.168.29.138:9200"]
    index => "rand-topic"
    workers => 1
 

【问题讨论】:

【参考方案1】:
 output 
  if [message][0][statusCode] == "200" 
     Do Somethings ....
    stdout  codec => ""
  

【讨论】:

我需要使用消息字段 json (\"statusCode\":200,) 根据此响应中的状态码进行过滤,并将其设置在日志存储的过滤器配置中以将其显示为单独的字段和让消息字段就像一个字符串而不是 kibana 中的 json 现在我已经使用它了。但不会起作用 mutate add_field => "statusCode" => "%[status]" 试试这个 :mutate add_field => "StatusCode" => "%[message][statusCode]" 我尝试了 2 种方式.. 但它不起作用。变异 add_field => "statusCode" => "%[message][0][statusCode]" 。和 mutate add_field => "statusCode" => "%[message][statusCode]" 输出 statusCode : %[message][0][status] 这里的状态码值没有设置那个变量。它就像返回给定的字符串

以上是关于我已经用logstash输入和elasticsearch输出实现了kafka。它在kibana中工作正常..我想根据状态码过滤数据的主要内容,如果未能解决你的问题,请参考以下文章

Centos7安装elasticsearchlogstashkibanaelasticsearch head

来自 jdbc 输入的 logstash 总和值

ELK日志分析系统

来自jdbc输入的logstash和值

ES 译文之如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同步

ES 译文之如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同