将 Elasticsearch 中的数据读入 Flink 聚合?
Posted
技术标签:
【中文标题】将 Elasticsearch 中的数据读入 Flink 聚合?【英文标题】:Reading data form Elasticsearch into Flink aggregation? 【发布时间】:2019-05-28 15:18:54 【问题描述】:我正在尝试使用 Kafka 消息(作为 StreamSource)更新 Elasticsearch 中的文档。使用 windows 和 Elasticsearch 连接器作为接收器批量写入 Elasticsearch 很好,但是,我们需要更新文档中的现有数据并以批量执行的方式读取它(不是针对每个元组,而是针对例如在byKey()
我们要聚合的拆分)
我们现在正在使用 Storm Trident,它在 persistentAggregate
之前执行批量读取,并在之后将更新的聚合写回,从而最大限度地减少与后端的交互。我只是在 Flink 中找不到类似的东西——有什么提示吗?
【问题讨论】:
【参考方案1】:如何在流上运行两个窗口调用 -
window1
- 从 elasticsearch 批量读取
window2
- 批量进入弹性搜索。
streamData
.window1(bulkRead and update/join)
.processFunction(...)
.window2(BulkPush)
您可以使用任何合适的方法进行批量读取,例如Storm Trident
。
在 window2 中使用 BulkProcessor link
【讨论】:
感谢您的回答!这需要为每个 window1 加载,而不是有一个内部缓存并且只加载尚未存在的键 - 但你是对的,我可能必须在 processFunction 中加载、更新和保存所有内容 - 看起来非常手动和非性能 wrt 数据库访问和批量读取/写入。不过感谢您的提示!以上是关于将 Elasticsearch 中的数据读入 Flink 聚合?的主要内容,如果未能解决你的问题,请参考以下文章
Elasticsearch:Data pipeline: Kafka => Flink => Elasticsearch