logstash写入es重复id

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash写入es重复id相关的知识,希望对你有一定的参考价值。

参考技术A 背景:

ai引擎检测到的攻击数据会写入kafka,然后通过logstash采集到es,提供给安全人员做人工评判。因为攻击可能重复出现,所以我们希望攻击(url)中只出现一次。并且安全人员需要在攻击的基础上添加评论和判断结果(ai判断的攻击有可能是假的  有可能是真的,所以需要操作这个索引)

期望:

我们希望攻击第一次出现的时候写入es中,当后续再次出现相同攻击的时候,我们以url作为索引id,对之前的数据进行覆盖,保证一个攻击只出现一次。并且保证党安全人员对该条文档操作的时候,结果字段和评论字段保留不覆盖,其他字段可以覆盖

操作:

确定的一件事是把url作为索引中唯一的标识,保证索引唯一

如下

对于es来说id重复的话,之前的会被覆盖,整个流程是先根据id删除之前的 ,然后在新增一条新数据,这样的后果是如果安全人员在该条日志上做了评论,则重复的id会把评论字段覆盖掉。所以需要解决如下问题。

尝试一:

输出中有个action字段,action的可选值为index,create,update,detele。默认为index。当时选择了update,但是发现采用update的时候会出现这种情况:

如果id存在,则覆盖,如果id不存在,则打印一条错误日志(不会影响logstash的运行)

这样的话实际上还是会覆盖,不能满足我们的要求

action

Value type is  string

Default value is "index"

Protocol agnostic (i.e. non-http, non-java specific) configs go here Protocol agnostic methods The Elasticsearch action to perform. Valid actions are:

index: indexes a document (an event from Logstash).

delete: deletes a document by id (An id is required for this action)

create: indexes a document, fails if a document by that id already exists in the index.

update: updates a document by id. Update has a special case where you can upsert — update a document if not already present. See the upsert option. NOTE: This does not work and is not supported in Elasticsearch 1.x. Please upgrade to ES 2.x or greater to use this feature with Logstash!

尝试二:

elasticsearch的输出中添加doc_as_upsert => true

但是尝试之后发现

当id存在的时候会覆盖,当id不存的时候会新增,这样也不满足我们的需求。所以这种方法也不适合

尝试三:

我们不采用覆盖的方式,如果之前_id存在的话,直接抛弃掉新数据。这样的话评论之类的数据不会被覆盖。如果_id不存在的话  直接新增。

解决方案是把尝试一中的action由update改为create

这个方法解决了我们的问题

参考:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-doc_as_upsert

Logstash收集数据并写入Mysql数据库

ES中的日志后续会被删除,但有些重要数据,比如状态码、客户端IP、客户端浏览器版本等,后期可以会按月或年做数据统计等。因此需要持久保存

1.安装Mysql数据库并修改配置​

apt-get -y install mysql-server
#修改配置
vim /etc/mysql/mysql.conf.d/mysqld.cnf
bind-address = 0.0.0.0
#重启
systemctl restart mysql

2.创建库和表并授权用户登录​

#进入mysql界面后执行如下操作:
create database elk character set utf8 collate utf8_bin;
create user elk@% identified by 123456;
grant all privileges on elk.* to elk@%;
flush privileges;
use elk
#创建表,字段对应需要保存的数据字段
create table elklog (clientip varchar(39),responsetime float(10,3),uri varchar(256),status char(3),time timestamp default current_timestamp );

3.Logstash 配置 mysql-connector-java包

官方下载地址:​​​https://dev.mysql.com/downloads/​

Logstash收集数据并写入Mysql数据库_Mysql

Logstash收集数据并写入Mysql数据库_ELK_02


#在logstash服务器执行
dpkg -i mysql-connector-j_8.0.32-1ubuntu22.04_all.deb
mkdir -p /usr/share/logstash/vendor/jar/jdbc
cp /usr/share/java/mysql-connector-j-8.0.32.jar /usr/share/logstash/vendor/jar/jdbc

4.更改gem源

Logstash 基于 Ruby 语言实现。Ruby 语言使用国外的gem源, 由于网络原因,从国内访问很慢而且不稳定

#在logstash服务器执行
apt-get -y install ruby
#由于默认源是国外的,需要指定为国内源
gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/

5.Logstash安装对应插件

查看已经安装有关jdbc的插件

Logstash收集数据并写入Mysql数据库_Mysql_03

还需要安装output-jdbc插件

/usr/share/logstash/bin/logstash-plugin install logstash-output-jdbc

查看是否安装成功,如果有logstash-output-jdbc说明安装成功:

/usr/share/logstash/bin/logstash-plugin list | grep jdbc

Logstash收集数据并写入Mysql数据库_Mysql_04


6.配置Logstash将数据写入数据库​

vim nginx_to_mysql_es.conf
input
file
path => "/var/log/nginx/access.log"
type => "nginx-accesslog"
#指定输入格式为json
codec => json
start_position => "beginning"
stat_interval => "3"


output
if [type] == "nginx-accesslog"
elasticsearch
hosts => ["192.168.131.11:9200","192.168.131.12:9200","192.168.131.13:9200"]
index => "nginx-accesslog-%+YYYY.MM.dd"

jdbc
#指定mysql连接驱动
driver_jar_path => "/usr/share/logstash/vendor/jar/jdbc/mysql-connector-j-8.0.32.jar"
connection_string => "jdbc:mysql://192.168.131.14/elk?user=elk&password=123456&useUnicode=true&characterEncoding=UTF8"
statement => ["INSERT INTO elklog(clientip,responsetime,uri,status) VALUES(?,?,?,?)","clientip","responsetime","uri","status"]


之后重启logstash​

执行:/usr/share/logstash/bin/logstash -f nginx_to_mysql_es.conf

向nginx日志追加内容:head -n 10 access_json.log-20220304 >> /var/log/nginx/access.log

此时查看数据库,发现已经有数据了:

Logstash收集数据并写入Mysql数据库_ELK_05


以上是关于logstash写入es重复id的主要内容,如果未能解决你的问题,请参考以下文章

Logstash接收Kafka数据写入至ES

当ES向logstash发出reject

es 无日志,logstash 报错

es redis logstash 日志收集系统排错

Logstash收集数据并写入Mysql数据库

python 消费kafka 写入es 小记