logstash写入es重复id
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash写入es重复id相关的知识,希望对你有一定的参考价值。
参考技术A 背景:ai引擎检测到的攻击数据会写入kafka,然后通过logstash采集到es,提供给安全人员做人工评判。因为攻击可能重复出现,所以我们希望攻击(url)中只出现一次。并且安全人员需要在攻击的基础上添加评论和判断结果(ai判断的攻击有可能是假的 有可能是真的,所以需要操作这个索引)
期望:
我们希望攻击第一次出现的时候写入es中,当后续再次出现相同攻击的时候,我们以url作为索引id,对之前的数据进行覆盖,保证一个攻击只出现一次。并且保证党安全人员对该条文档操作的时候,结果字段和评论字段保留不覆盖,其他字段可以覆盖
操作:
确定的一件事是把url作为索引中唯一的标识,保证索引唯一
如下
对于es来说id重复的话,之前的会被覆盖,整个流程是先根据id删除之前的 ,然后在新增一条新数据,这样的后果是如果安全人员在该条日志上做了评论,则重复的id会把评论字段覆盖掉。所以需要解决如下问题。
尝试一:
输出中有个action字段,action的可选值为index,create,update,detele。默认为index。当时选择了update,但是发现采用update的时候会出现这种情况:
如果id存在,则覆盖,如果id不存在,则打印一条错误日志(不会影响logstash的运行)
这样的话实际上还是会覆盖,不能满足我们的要求
action
Value type is string
Default value is "index"
Protocol agnostic (i.e. non-http, non-java specific) configs go here Protocol agnostic methods The Elasticsearch action to perform. Valid actions are:
index: indexes a document (an event from Logstash).
delete: deletes a document by id (An id is required for this action)
create: indexes a document, fails if a document by that id already exists in the index.
update: updates a document by id. Update has a special case where you can upsert — update a document if not already present. See the upsert option. NOTE: This does not work and is not supported in Elasticsearch 1.x. Please upgrade to ES 2.x or greater to use this feature with Logstash!
尝试二:
elasticsearch的输出中添加doc_as_upsert => true
但是尝试之后发现
当id存在的时候会覆盖,当id不存的时候会新增,这样也不满足我们的需求。所以这种方法也不适合
尝试三:
我们不采用覆盖的方式,如果之前_id存在的话,直接抛弃掉新数据。这样的话评论之类的数据不会被覆盖。如果_id不存在的话 直接新增。
解决方案是把尝试一中的action由update改为create
这个方法解决了我们的问题
参考:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-doc_as_upsert
Logstash收集数据并写入Mysql数据库
ES中的日志后续会被删除,但有些重要数据,比如状态码、客户端IP、客户端浏览器版本等,后期可以会按月或年做数据统计等。因此需要持久保存
1.安装Mysql数据库并修改配置
apt-get -y install mysql-server
#修改配置
vim /etc/mysql/mysql.conf.d/mysqld.cnf
bind-address = 0.0.0.0
#重启
systemctl restart mysql
2.创建库和表并授权用户登录
#进入mysql界面后执行如下操作:
create database elk character set utf8 collate utf8_bin;
create user elk@% identified by 123456;
grant all privileges on elk.* to elk@%;
flush privileges;
use elk
#创建表,字段对应需要保存的数据字段
create table elklog (clientip varchar(39),responsetime float(10,3),uri varchar(256),status char(3),time timestamp default current_timestamp );
3.Logstash 配置 mysql-connector-java包
官方下载地址:https://dev.mysql.com/downloads/
#在logstash服务器执行
dpkg -i mysql-connector-j_8.0.32-1ubuntu22.04_all.deb
mkdir -p /usr/share/logstash/vendor/jar/jdbc
cp /usr/share/java/mysql-connector-j-8.0.32.jar /usr/share/logstash/vendor/jar/jdbc
4.更改gem源
Logstash 基于 Ruby 语言实现。Ruby 语言使用国外的gem源, 由于网络原因,从国内访问很慢而且不稳定
#在logstash服务器执行
apt-get -y install ruby
#由于默认源是国外的,需要指定为国内源
gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
5.Logstash安装对应插件
查看已经安装有关jdbc的插件
还需要安装output-jdbc插件
/usr/share/logstash/bin/logstash-plugin install logstash-output-jdbc
查看是否安装成功,如果有logstash-output-jdbc说明安装成功:
/usr/share/logstash/bin/logstash-plugin list | grep jdbc
6.配置Logstash将数据写入数据库
vim nginx_to_mysql_es.conf
input
file
path => "/var/log/nginx/access.log"
type => "nginx-accesslog"
#指定输入格式为json
codec => json
start_position => "beginning"
stat_interval => "3"
output
if [type] == "nginx-accesslog"
elasticsearch
hosts => ["192.168.131.11:9200","192.168.131.12:9200","192.168.131.13:9200"]
index => "nginx-accesslog-%+YYYY.MM.dd"
jdbc
#指定mysql连接驱动
driver_jar_path => "/usr/share/logstash/vendor/jar/jdbc/mysql-connector-j-8.0.32.jar"
connection_string => "jdbc:mysql://192.168.131.14/elk?user=elk&password=123456&useUnicode=true&characterEncoding=UTF8"
statement => ["INSERT INTO elklog(clientip,responsetime,uri,status) VALUES(?,?,?,?)","clientip","responsetime","uri","status"]
之后重启logstash
执行:/usr/share/logstash/bin/logstash -f nginx_to_mysql_es.conf
向nginx日志追加内容:head -n 10 access_json.log-20220304 >> /var/log/nginx/access.log
此时查看数据库,发现已经有数据了:
以上是关于logstash写入es重复id的主要内容,如果未能解决你的问题,请参考以下文章