Logstash 2.3.4如何使用logstash-jdbc插件在Elasticsearch中加载嵌套文档
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Logstash 2.3.4如何使用logstash-jdbc插件在Elasticsearch中加载嵌套文档相关的知识,希望对你有一定的参考价值。
我当前正在使用elasticsearch 2.3.4和logstash 2.3.4,使用logstash-jdbc插件将Oracle db中的关系数据加载到我的Elasticsearch索引中。正如各种帖子中所建议的那样,我为此使用了聚合筛选器。仍然无法在文档中加载内部嵌套对象。这些值未映射到字段,并显示为NULL。
我有两个相关实体,其以下数据:
CREATE TABLE DEPARTMENT (
id NUMBER PRIMARY KEY,
name VARCHAR2(4000) NOT NULL
)
CREATE TABLE EMPLOYEE (
id NUMBER PRIMARY KEY,
name VARCHAR2(4000) NOT NULL,
departmentid NUMBER,
CONSTRAINT EMPLOYEE_FK FOREIGN KEY (departmentid) REFERENCES DEPARTMENT(id)
)
insert into DEPARTMENT values (1, 'dept1');
insert into DEPARTMENT values (2, 'dept2');
insert into DEPARTMENT values (3, 'dept3');
insert into DEPARTMENT values (4, 'dept4');
insert into EMPLOYEE values (1, 'emp1', 1);
insert into EMPLOYEE values (2, 'emp2', 1);
insert into EMPLOYEE values (3, 'emp3', 1);
insert into EMPLOYEE values (4, 'emp4', 2);
insert into EMPLOYEE values (5, 'emp5', 2);
insert into EMPLOYEE values (6, 'emp6', 3);`
这是我的映射:
"mappings":
"departments":
"properties":
"id":
"type": "integer"
,
"deptName":
"type": "string"
,
"employee_details":
"type": "nested",
"properties":
"empId":
"type": "integer"
,
"empName":
"type": "string"
这是我的logstash配置:
input
jdbc
jdbc_validate_connection => true
jdbc_connection_string => "jdbc:oracle:thin:@host:port:db"
jdbc_user => "user"
jdbc_password => "pwd"
jdbc_driver_library => "../vendor/jar/ojdbc14.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
statement => "SELECT
department.id AS id,
department.name AS deptName,
employee.id AS empId,
employee.name AS empName
FROM department LEFT JOIN employee
ON department.id = employee.departmentid
ORDER BY id"
filter
aggregate
task_id => "%id"
code => "
map['id'] = event['id']
map['deptName'] = event['deptName'] #solution - deptName should be in smaller case and other fields too.
map['employee_details'] ||= []
map['employee_details'] << 'empId' => event['empId], 'empName' => event['empName']
"
push_previous_map_as_event => true
timeout => 5
timeout_tags => ['aggregated']
output
stdout codec => rubydebug
elasticsearch
action => "index"
index => "my_index"
document_type => "departments"
document_id => "%id"
hosts => "localhost:9200"
当我在所有文档上执行XGET时:curl -XGET'localhost:9200 / my_index / _search /?pretty = true&q = :
这些值未映射到字段并显示为NULL:
"took": 1,
"timed_out": false,
"_shards":
"total": 5,
"successful": 5,
"failed": 0
,
"hits":
"total": 4,
"max_score": 1,
"hits": [
"_index": "my_index",
"_type": "departments",
"_id": "2",
"_score": 1,
"_source":
"id": 2,
"deptName": null,
"employee_details": [
"empId": null,
"empName": null
,
"empId": null,
"empName": null
],
"@version": "1",
"@timestamp": "2019-05-14T10:47:33.477Z",
"tags": [
"aggregated"
]
,
"_index": "my_index",
"_type": "departments",
"_id": "4",
"_score": 1,
"_source":
"id": 4,
"deptname": "dept4",
"empid": null,
"empname": null,
"@version": "1",
"@timestamp": "2019-05-14T10:47:33.367Z",
"deptName": null,
"employee_details": [
"empId": null,
"empName": null
]
,
"_index": "my_index",
"_type": "departments",
"_id": "1",
"_score": 1,
"_source":
"id": 1,
"deptName": null,
"employee_details": [
"empId": null,
"empName": null
,
"empId": null,
"empName": null
,
"empId": null,
"empName": null
],
"@version": "1",
"@timestamp": "2019-05-14T10:47:33.477Z",
"tags": [
"aggregated"
]
,
"_index": "my_index",
"_type": "departments",
"_id": "3",
"_score": 1,
"_source":
"id": 3,
"deptName": null,
"employee_details": [
"empId": null,
"empName": null
],
"@version": "1",
"@timestamp": "2019-05-14T10:47:33.492Z",
"tags": [
"aggregated"
]
]
rubydebug建议将值设置为'nil'。有人可以帮我解决我在这里做错的事情吗?
这里是标准输出的id为1的文档的片段:
"id" => 1.0,
"deptname" => "dept1",
"empid" => 1.0,
"empname" => "emp1",
"@version" => "1",
"@timestamp" => "2019-05-14T12:32:14.272Z"
"id" => 1.0,
"deptname" => "dept1",
"empid" => 2.0,
"empname" => "emp2",
"@version" => "1",
"@timestamp" => "2019-05-14T12:32:15.272Z"
"id" => 1.0,
"deptname" => "dept1",
"empid" => 3.0,
"empname" => "emp3",
"@version" => "1",
"@timestamp" => "2019-05-14T12:32:15.272Z"
"id" => 1.0,
"deptName" => nil,
"employee_details" => [
[0]
"empId" => nil,
"empName" => nil
,
[1]
"empId" => nil,
"empName" => nil
,
[2]
"empId" => nil,
"empName" => nil
],
"@version" => "1",
"@timestamp" => "2019-05-14T12:32:15.381Z",
"tags" => [
[0] "aggregated"
]
答案
以下代码对我有用。
input
jdbc
jdbc_validate_connection => true
jdbc_connection_string => "----/employees"
jdbc_user => "---"
jdbc_password => "--"
jdbc_driver_library => "/home/ilsa/mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT
e.emp_no as employee_number,
birth_date, first_name, last_name, gender, hire_date, t.title AS titlename,
t.from_date AS titlefrom_date, t.to_date AS titleto_date, d.dept_no AS departmentnumber,
ds.dept_name AS departmentname, d.from_date AS departmentfrom_date, d.to_date AS departmentto_date
FROM employees e
LEFT JOIN(titles t, dept_emp d, departments ds)
ON(e.emp_no = t.emp_no AND e.emp_no = d.emp_no AND d.dept_no = ds.dept_no AND t.from_date < d.to_date AND t.to_date > d.from_date)
ORDER BY e.emp_no ASC"
filter
aggregate
task_id => "%employee_number"
code => "
map['employee_number'] = event.get('employee_number')
map['birth_date'] = event.get('birth_date')
map['first_name'] = event.get('first_name')
map['last_name'] = event.get('last_name')
map['gender'] = event.get('gender')
map['hire_date'] = event.get('hire_date')
map['roles'] ||= []
map['roles'] <<
'title.name' => event.get('titlename'),
'title.from_date' => event.get('titlefrom_date'),
'title.to_date' => event.get('titleto_date'),
'department.number' => event.get('departmentnumber'),
'department.name' => event.get('departmentname'),
'department.from_date' => event.get('departmentfrom_date'),
'department.to_date' => event.get('departmentto_date')
event.cancel()"
push_previous_map_as_event => true
timeout => 30
output
stdout codec => rubydebug
elasticsearch
action => "index"
index => "employees"
document_type => "employee"
document_id => "%employee_number"
hosts => "localhost:9200"
以上是关于Logstash 2.3.4如何使用logstash-jdbc插件在Elasticsearch中加载嵌套文档的主要内容,如果未能解决你的问题,请参考以下文章
Logstash 2.3.4如何使用logstash-jdbc插件在Elasticsearch中加载嵌套文档