Logstash:Jdbc static filter plugin 介绍
Posted 中国社区官方博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Logstash:Jdbc static filter plugin 介绍相关的知识,希望对你有一定的参考价值。
在 Logstash 的过滤器中。有许多的过滤器是可以用来和外部的数据对 pipeline 里的数据进行丰富的。Jdbc_static 过滤器就是其中的一个。此过滤器使用从远程数据库预加载的数据丰富事件。我们之所以选择这个,是因为你不必频繁查询你的数据库,也不会给你的数据库带来很大的压力。
此过滤器最适合使用静态或不经常更改的参考数据(例如环境、用户和产品)来丰富事件。
此过滤器的工作方式是从远程数据库获取数据,将其缓存在本地内存中的 Apache Derby 数据库中,并使用查找来使用本地数据库中缓存的数据来丰富事件。 你可以将过滤器设置为一次加载远程数据(对于静态数据),或者你可以安排远程加载定期运行(对于需要刷新的数据)。
在本篇文章中,我们将详细地介绍如何使用这个过滤器来丰富数据。首先,我们可以参考我之前的另外一篇文章 “Logstash:运用 jdbc_streaming 来丰富我们的数据” 来设置如何访问 mysql 数据以及在 Jdbc stack 过滤器中如何配置访问 MySQL。
安装 MySQL
如果你还没有安装及配置好你的 MySQL,请参照我之前的文章 “Logstash:把 MySQL 数据导入到 Elasticsearch 中” 来进行安装及配置好自己的 MySQL 数据库。记得把相应的 Connector 放入相应的文件目录中。
安装好我们的 MySQL 后,我们创建一个叫做 data 的数据库,并创建一个叫做 sensors 及一个叫做 users 的表格:
上面的两个表格非常简单。我们可以使用 MySQL 命令查看它们的内容:
mysql> describe sensors;
+-----------------+---------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------+---------------+------+-----+---------+-------+
| customer | varchar(255) | YES | | NULL | |
| room | varchar(255) | YES | | NULL | |
| floor | varchar(255) | YES | | NULL | |
| department | varchar(255) | YES | | NULL | |
| locationOnFloor | varchar(255) | YES | | NULL | |
| sensorType | varchar(255) | YES | | NULL | |
| buildingName | varchar(255) | YES | | NULL | |
| longitude | double(255,0) | YES | | NULL | |
| latitude | double(255,0) | YES | | NULL | |
| id | int(255) | NO | PRI | NULL | |
+-----------------+---------------+------+-----+---------+-------+
10 rows in set (0.00 sec)
mysql> select * from sensors;
+----------+------+---------+-------------+-----------------+-------------+--------------+-----------+----------+----+
| customer | room | floor | department | locationOnFloor | sensorType | buildingName | longitude | latitude | id |
+----------+------+---------+-------------+-----------------+-------------+--------------+-----------+----------+----+
| Elastic | 101 | Floor 1 | Engineering | Desk 102 | Temperature | 222 Broadway | -74 | 41 | 1 |
+----------+------+---------+-------------+-----------------+-------------+--------------+-----------+----------+----+
1 row in set (0.00 sec)
mysql> describe users;
+-----------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+----------------+
| userid | int(11) | NO | PRI | NULL | auto_increment |
| firstname | varchar(255) | YES | | NULL | |
| lastname | varchar(255) | YES | | NULL | |
+-----------+--------------+------+-----+---------+----------------+
3 rows in set (0.00 sec)
mysql> select * from users;
+--------+-----------+----------+
| userid | firstname | lastname |
+--------+-----------+----------+
| 2 | xiaoguo | liu |
+--------+-----------+----------+
1 row in set (0.00 sec)
这样,我们就完成了 MySQL 的部署。
使用 Jdbc 过滤器来丰富数据
接下来,我们想使用 sensors 表格中的 id 以及 users 表格中的 userid 来对我们的事件进行丰富。我们来创建如下的一个 Logstash 的配置文件:
logstash.conf
input {
generator {
message => "id=1&userid=2"
count => 1
}
}
filter {
kv {
source => "message"
field_split => "&?"
}
mutate {
convert => {
"id" => "integer"
"userid" => "integer"
}
}
jdbc_static {
loaders => [
{
id => "remote-sensors"
query => "select id, customer, room, sensorType from sensors"
local_table => "sensors"
},
{
id => "remote-users"
query => "select firstname, lastname, userid from users order by userid"
local_table => "users"
}
]
local_db_objects => [
{
name => "sensors"
index_columns => ["id"]
columns => [
["id", "int"],
["customer", "varchar(255)"],
["room", "varchar(255)"],
["sensorType", "varchar(255)"]
]
},
{
name => "users"
index_columns => ["userid"]
columns => [
["firstname", "varchar(255)"],
["lastname", "varchar(255)"],
["userid", "int"]
]
}
]
local_lookups => [
{
id => "local-servers"
query => "select customer, room, sensorType from sensors WHERE id = :id"
parameters => {id => "[id]"}
target => "server"
},
{
id => "local-users"
query => "select firstname, lastname from users WHERE userid = :userid"
parameters => {userid => "[userid]"}
target => "user"
}
]
# using add_field here to add & rename values to the event root
add_field => { sensor_name => "%{[server][0][customer]} %{[server][0][room]} %{[server][0][sensortype]}" }
add_field => { customer_name => "%{[user][0][firstname]} %{[user][0][lastname]}" }
# remove_field => ["server", "user"]
jdbc_user => "root"
jdbc_password => "1234"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# jdbc_driver_library => ""
jdbc_connection_string => "jdbc:mysql://localhost:3306/data?useTimezone=true&&serverTimezone=UTC"
}
}
output {
stdout {
codec => rubydebug
}
}
在上面,我们使用 generator 来创建一个事件。我们在 filter 的部分使用 kv 过滤器来对它进行解析。由于 id 和 userid 为字符串。它们和数据库表格中的字段属性是不一样的。我们使用 mutate 过滤器来对它们的类型进行转换。接下来,我们使用 jdbc_static 过滤器来对这个事件进行丰富。
我们通过如下的命令来启动 Logstash:
./bin/logstash -f logstash.conf
我们可以在 terminal 中看到如下的结果:
从上面,我们可以看出来有两个新增加的字段 sensor_name 以及 customer_name。这两个字段是从 MySQL 中的表格中进行丰富而来。更多关于 Jdbc static 过滤器的介绍请参阅官方文档。
以上是关于Logstash:Jdbc static filter plugin 介绍的主要内容,如果未能解决你的问题,请参考以下文章
Windows系统上安装logstash和logstash-input-jdbc
logstash | logstash && logstash-input-jdbc 安装