Logstash -> Elasticsearch - 更新非规范化数据

Posted

技术标签:

【中文标题】Logstash -> Elasticsearch - 更新非规范化数据【英文标题】:Logstash -> Elasticsearch - update denormalized data 【发布时间】:2017-06-17 23:38:57 【问题描述】:

用例说明

我们有一个关系数据库,其中包含有关我们日常运营的数据。目标是允许用户使用全文搜索引擎搜索重要数据。数据被规范化,因此不是进行全文查询的最佳形式,因此我们的想法是对数据的子集进行非规范化并将其实时复制到 Elasticsearch,这使我们能够创建快速准确的搜索应用程序.

我们已经有一个系统可以启用Event Sourcing 的数据库操作(插入、更新、删除)。事件仅包含更改的列和主键(在更新时我们没有得到整行)。 Logstash 已经收到每个事件的通知,因此这部分已经处理完毕。


实际问题

现在我们正在解决我们的问题。由于计划是对我们的数据进行非规范化,因此我们必须确保将父对象的更新传播到 Elasticsearch 中的非规范化子对象。我们如何配置 logstash 来做到这一点?

示例

假设我们在 Elasticsearch 中维护了一个 Employees 列表。每个Employee 都分配给一个Company。由于数据是非规范化的(为了更快的搜索),每个Employee 还带有Company 的名称和地址。更新更改了 Company 的名称 - 我们如何配置 logstash 以更新分配给 Company 的所有 Employees 中的公司名称?


补充说明

@Darth_Vader: 我们面临的问题是,我们得到一个 Company 已更改的事件,但我们想在 Elasticsearch 中修改 Employee 类型的文档,因为它们本身携带有关公司的数据。您的回答期望我们会为每个 Employee 获得一个事件,但事实并非如此。

也许这会让它更清楚。我们在 Elasticsearch 中有 3 名员工:

type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company A'
type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company A'
type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'

然后在源数据库中发生更新。

UPDATE company SET name = 'Company NEW' WHERE cmp_id = 1;

我们在 logstash 中收到一个事件,它的内容如下:

type:'company',cmp_id:'1',old.name:'Company A',new.name:'Company NEW'

然后应该将其传播到 Elasticsearch,以便生成的员工是:

type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company NEW'
type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company NEW'
type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'

请注意,company.name 字段已更改。

【问题讨论】:

您的意思是更新所有员工公司名称,并将公司字段分配给每个人他们? 更新不一定会影响所有员工,而是那些在变更公司工作的员工。是的,每个员工都有一个字段company.idcompany.namecompany.address 等。 【参考方案1】:

我建议使用与我发布的 here 类似的解决方案,即使用 http 输出插件通过对 Employee 索引的查询调用发出更新。查询需要如下所示:

POST employees/_update_by_query

  "script": 
    "source": "ctx._source.company.name = params.name",
    "lang": "painless",
    "params": 
      "name": "Company NEW"
    
  ,
  "query": 
    "term": 
      "company.cmp_id": "1"
    
  

所以您的 Logstash 配置应该如下所示:

input 
  ... 

filter 
  mutate 
    add_field => 
      "[script][lang]" => "painless"
      "[script][source]" => "ctx._source.company.name = params.name"
      "[script][params][name]" => "%new.name"
      "[query][term][company.cmp_id]" => "%cmp_id"
    
    remove_field => ["host", "@version", "@timestamp", "type", "cmp_id", "old.name", "new.name"]
  

output 
  http 
    url => "http://localhost:9200/employees/_update_by_query"
    http_method => "post"
    format => "json"
  

【讨论】:

自从我发布问题以来已经很长时间了,所以我不记得任何细节,但如果我没记错的话,我还使用 http 输出插件通过查询请求发送更新。我相信你的配置是正确的。 知道了,我通过github issue中的链接偶然发现了这篇文章

以上是关于Logstash -> Elasticsearch - 更新非规范化数据的主要内容,如果未能解决你的问题,请参考以下文章

ES 译文之如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同步

ES 译文之如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同

Linux ELK日志分析系统 | logstash日志收集 | elasticsearch 搜索引擎 | kibana 可视化平台 | 架构搭建 | 超详细

震惊全网的ELK日志分析系统(齐全详细理论+搭建步骤图释)

震惊全网的ELK日志分析系统(齐全详细理论+搭建步骤图释)

基于ELK+Beats进行系统监控