将 Elasticsearch 中的数据读入 Flink 聚合?

Posted

技术标签:

【中文标题】将 Elasticsearch 中的数据读入 Flink 聚合?【英文标题】:Reading data form Elasticsearch into Flink aggregation? 【发布时间】:2019-05-28 15:18:54 【问题描述】:

我正在尝试使用 Kafka 消息(作为 StreamSource)更新 Elasticsearch 中的文档。使用 windows 和 Elasticsearch 连接器作为接收器批量写入 Elasticsearch 很好,但是,我们需要更新文档中的现有数据并以批量执行的方式读取它(不是针对每个元组,而是针对例如在byKey() 我们要聚合的拆分)

我们现在正在使用 Storm Trident,它在 persistentAggregate 之前执行批量读取,并在之后将更新的聚合写回,从而最大限度地减少与后端的交互。我只是在 Flink 中找不到类似的东西——有什么提示吗?

【问题讨论】:

【参考方案1】:

如何在流上运行两个窗口调用 -

window1 - 从 elasticsearch 批量读取

window2 - 批量进入弹性搜索。

streamData
  .window1(bulkRead and update/join)
  .processFunction(...)
  .window2(BulkPush)
您可以使用任何合适的方法进行批量读取,例如Storm Trident。 在 window2 中使用 BulkProcessor link

【讨论】:

感谢您的回答!这需要为每个 window1 加载,而不是有一个内部缓存并且只加载尚未存在的键 - 但你是对的,我可能必须在 processFunction 中加载、更新和保存所有内容 - 看起来非常手动和非性能 wrt 数据库访问和批量读取/写入。不过感谢您的提示!

以上是关于将 Elasticsearch 中的数据读入 Flink 聚合?的主要内容,如果未能解决你的问题,请参考以下文章

二进制文件读入

二进制文件读入

Elasticsearch:Data pipeline: Kafka => Flink => Elasticsearch

将 Access 数据库中的数据读入列表框

将文件中的数据读入矢量对象

将多个文件中的数据读入单个 RDD 或 Dataframe