多线程和logstash聚合过滤器
Posted
技术标签:
【中文标题】多线程和logstash聚合过滤器【英文标题】:Multiple threads and logstash aggregation filter 【发布时间】:2017-02-06 15:12:27 【问题描述】:我有一条从 SQL 数据库到 Elasticsearch 的管道,如下所示:
-
使用 logstash-input-jdbc 从 SQL 数据库输入
单个事件的各种过滤和突变
使用 logstash-filter-aggregate 根据 group_id 属性聚合事件
使用 logstash-output-elasticsearch 将聚合事件输出到 Elasticsearch
事实上,这条管道的吞吐量非常低。我知道这是由于聚合步骤(执行一些相对繁重的处理),我想使用多个线程/进程来提高性能(允许我使用多个内核)。
但是,logstash-filter-aggregate 插件不支持多个过滤工作者——大概是因为它无法保证应该组合成一个聚合事件的事件将由同一个工作者处理。
我目前对此的解决方案是运行多个 logstash 实例,其中每个实例从 SQL 数据库中选择 group_ids 的某个子集。但是,这有相当多的开销。有没有更好的方法通过 logstash-filter-aggregate 使用多核?
【问题讨论】:
【参考方案1】:你在一个小洞里,那里。 Aggregate
是需要对事件流进行序列化的过滤器之一,因为它认为需要的状态是一个或多个事件。任何时候你需要序列化,你的吞吐量都将被限制在一个核心上,以确保过滤器工作人员看到它需要的所有事件。与数据库一样,解决此问题的方法是将数据集分片。你已经发现了。
确实,分片是解决此问题的最佳单阶段 Logstash 解决方案。
有一个多阶段的解决方案,如果你想去那里。那就是建立第二条管道。它会像这样工作:
-
第一个管道提取事件并将相关事件标记为相关,但不执行聚合。
与您一样,第一个管道输出到
elasticsearch
。
第二个管道使用elasticsearch
输入 来查询看起来像是未聚合事件的事件。
聚合层聚合完整的事件(包含所有部分)
聚合事件被刷新到 Elasticsearch。
或者,您可以使用非 logstash 方法在 ElasticSearch 中执行聚合。
【讨论】:
感谢您的建议。我想我希望的是一种在单个 logstash 实例中进行分片的方法,但这似乎是不可能的。以上是关于多线程和logstash聚合过滤器的主要内容,如果未能解决你的问题,请参考以下文章
从静态代码扫描引擎PMD源码学习-多线程任务模型和File过滤设计