Logstash 2.3.4如何使用logstash-jdbc插件在Elasticsearch中加载嵌套文档

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Logstash 2.3.4如何使用logstash-jdbc插件在Elasticsearch中加载嵌套文档相关的知识,希望对你有一定的参考价值。

我当前正在使用elasticsearch 2.3.4和logstash 2.3.4,使用logstash-jdbc插件将Oracle db中的关系数据加载到我的Elasticsearch索引中。正如各种帖子中所建议的那样,我为此使用了聚合筛选器。仍然无法在文档中加载内部嵌套对象。这些值未映射到字段,并显示为NULL。

我有两个相关实体,其以下数据:

    CREATE TABLE DEPARTMENT (
        id NUMBER PRIMARY KEY,
        name VARCHAR2(4000) NOT NULL
    )

    CREATE TABLE EMPLOYEE (
        id NUMBER PRIMARY KEY,
        name VARCHAR2(4000) NOT NULL,
        departmentid NUMBER,
        CONSTRAINT EMPLOYEE_FK FOREIGN KEY (departmentid) REFERENCES DEPARTMENT(id)
    ) 


    insert into DEPARTMENT values (1, 'dept1');
    insert into DEPARTMENT values (2, 'dept2');
    insert into DEPARTMENT values (3, 'dept3');
    insert into DEPARTMENT values (4, 'dept4');

    insert into EMPLOYEE values (1, 'emp1', 1);
    insert into EMPLOYEE values (2, 'emp2', 1);
    insert into EMPLOYEE values (3, 'emp3', 1);
    insert into EMPLOYEE values (4, 'emp4', 2);
    insert into EMPLOYEE values (5, 'emp5', 2);
    insert into EMPLOYEE values (6, 'emp6', 3);`

这是我的映射:

   
        "mappings": 
            "departments": 
                "properties": 
                    "id": 
                        "type": "integer"
                    ,
                    "deptName": 
                        "type": "string"
                    ,          
                    "employee_details": 
                        "type": "nested",
                        "properties": 
                            "empId": 
                                "type": "integer"
                            ,
                            "empName": 
                                "type": "string"
                            
                        
                    
                
            
        
    

这是我的logstash配置:

  input
        jdbc
            jdbc_validate_connection => true
            jdbc_connection_string => "jdbc:oracle:thin:@host:port:db"
            jdbc_user => "user"
            jdbc_password => "pwd"
            jdbc_driver_library => "../vendor/jar/ojdbc14.jar"
            jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
            statement => "SELECT 
                            department.id AS id,
                            department.name AS deptName,
                            employee.id AS empId,
                            employee.name AS empName
                        FROM  department LEFT JOIN employee  
                        ON department.id = employee.departmentid
                        ORDER BY id"
        
    

    filter
        aggregate 
            task_id => "%id"
            code => "
            map['id'] = event['id']
            map['deptName'] = event['deptName'] #solution - deptName should be in smaller case and other fields too.
            map['employee_details'] ||= []
            map['employee_details'] << 'empId' => event['empId], 'empName' => event['empName'] 
            "

            push_previous_map_as_event => true
            timeout => 5
            timeout_tags => ['aggregated']
               
    

    output
    stdout codec => rubydebug 
        elasticsearch
            action => "index"
            index => "my_index"
            document_type => "departments"
            document_id => "%id"
            hosts => "localhost:9200"
        
    

当我在所有文档上执行XGET时:curl -XGET'localhost:9200 / my_index / _search /?pretty = true&q =

这些值未映射到字段并显示为NULL:

      "took": 1,
      "timed_out": false,
      "_shards": 
        "total": 5,
        "successful": 5,
        "failed": 0
      ,
      "hits": 
        "total": 4,
        "max_score": 1,
        "hits": [
          
            "_index": "my_index",
            "_type": "departments",
            "_id": "2",
            "_score": 1,
            "_source": 
              "id": 2,
              "deptName": null,
              "employee_details": [
                
                  "empId": null,
                  "empName": null
                ,
                
                  "empId": null,
                  "empName": null
                
              ],
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.477Z",
              "tags": [
                "aggregated"
              ]
            
          ,
          
            "_index": "my_index",
            "_type": "departments",
            "_id": "4",
            "_score": 1,
            "_source": 
              "id": 4,
              "deptname": "dept4",
              "empid": null,
              "empname": null,
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.367Z",
              "deptName": null,
              "employee_details": [
                
                  "empId": null,
                  "empName": null
                
              ]
            
          ,
          
            "_index": "my_index",
            "_type": "departments",
            "_id": "1",
            "_score": 1,
            "_source": 
              "id": 1,
              "deptName": null,
              "employee_details": [
                
                  "empId": null,
                  "empName": null
                ,
                
                  "empId": null,
                  "empName": null
                ,
                
                  "empId": null,
                  "empName": null
                
              ],
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.477Z",
              "tags": [
                "aggregated"
              ]
            
          ,
          
            "_index": "my_index",
            "_type": "departments",
            "_id": "3",
            "_score": 1,
            "_source": 
              "id": 3,
              "deptName": null,
              "employee_details": [
                
                  "empId": null,
                  "empName": null
                
              ],
              "@version": "1",
              "@timestamp": "2019-05-14T10:47:33.492Z",
              "tags": [
                "aggregated"
              ]
            
          
        ]
      
    

rubydebug建议将值设置为'nil'。有人可以帮我解决我在这里做错的事情吗?

这里是标准输出的id为1的文档的片段:


            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 1.0,
       "empname" => "emp1",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:14.272Z"


            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 2.0,
       "empname" => "emp2",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"


            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 3.0,
       "empname" => "emp3",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"


                  "id" => 1.0,
            "deptName" => nil,
    "employee_details" => [
        [0] 
              "empId" => nil,
            "empName" => nil
        ,
        [1] 
              "empId" => nil,
            "empName" => nil
        ,
        [2] 
              "empId" => nil,
            "empName" => nil
        
    ],
            "@version" => "1",
          "@timestamp" => "2019-05-14T12:32:15.381Z",
                "tags" => [
        [0] "aggregated"
    ]

答案

以下代码对我有用。

input 
    jdbc
        jdbc_validate_connection => true
        jdbc_connection_string => "----/employees"
        jdbc_user => "---"
        jdbc_password => "--"
        jdbc_driver_library => "/home/ilsa/mysql-connector-java-5.1.36-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        statement => "SELECT  
            e.emp_no as employee_number, 
            birth_date, first_name, last_name, gender, hire_date, t.title  AS titlename, 
            t.from_date AS titlefrom_date, t.to_date AS titleto_date, d.dept_no AS departmentnumber, 
            ds.dept_name AS departmentname, d.from_date AS departmentfrom_date, d.to_date AS departmentto_date 
        FROM employees e 
        LEFT JOIN(titles t, dept_emp d, departments ds) 
        ON(e.emp_no = t.emp_no AND e.emp_no = d.emp_no AND d.dept_no = ds.dept_no AND t.from_date < d.to_date AND t.to_date >   d.from_date) 
ORDER BY e.emp_no ASC"
    


filter 
    aggregate 
        task_id => "%employee_number"
        code => "
            map['employee_number'] = event.get('employee_number')
            map['birth_date'] = event.get('birth_date')
            map['first_name'] = event.get('first_name')
            map['last_name'] = event.get('last_name')
            map['gender'] = event.get('gender')
            map['hire_date'] = event.get('hire_date')
            map['roles'] ||= []
            map['roles'] << 

                'title.name' => event.get('titlename'),
                'title.from_date' => event.get('titlefrom_date'),
                'title.to_date' => event.get('titleto_date'),
                'department.number' => event.get('departmentnumber'),
                'department.name' => event.get('departmentname'),
                'department.from_date' => event.get('departmentfrom_date'),
                'department.to_date' => event.get('departmentto_date')
            
        event.cancel()"
        push_previous_map_as_event => true
        timeout => 30
    

output 
    stdout codec => rubydebug 
    elasticsearch
        action => "index"
        index => "employees"
        document_type => "employee"
        document_id => "%employee_number"
        hosts => "localhost:9200"
    


以上是关于Logstash 2.3.4如何使用logstash-jdbc插件在Elasticsearch中加载嵌套文档的主要内容,如果未能解决你的问题,请参考以下文章

Logstash 2.3.4如何使用logstash-jdbc插件在Elasticsearch中加载嵌套文档

Logstash 常用 filter 插件介绍

干货 | Logstash自定义正则表达式ETL实战

elk/elasticsearch+fluentd+kibana

ElasticSearch——自定义模板

ELK 日志分析系统