巧用 Logstash 实现服务端文档 Join
Posted 以无限为有限
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了巧用 Logstash 实现服务端文档 Join相关的知识,希望对你有一定的参考价值。
记得关注这里噢!
小
问题
下面的每行文本为一条日志消息,请问如何才能将它们按照字段 thread_id 进行聚合?
{"thread_id":"123", "a":"message A11"}
{"thread_id":"345", "message":"message E12", "code": "1000"}
{"thread_id":"234", "b":"message C11"}
{"thread_id":"234", "f":"message D11"}
{"thread_id":"123", "b":"message B11"}
啊,怎么办
直接采集这些日志,每一行会产生一个索引文档,他们之间是独立的,没有办法进行关联,查询的时候没有办法进行 Join,而查询的要求是希望进行这些的字段的关联分析,独立分散的文档不满足查询需求。那怎么 Join?
Elasticsearch 不支持查询端 Join(Parent/Nested不满足我们的要求),那么在索引的时候做吧。
很明显,通过分析这些日志会发现,他们的顺序是混杂一起的,没有办法通过多行的方式来进行合并 Join,那么有没有一种简单的方法来合并呢?
继续分析,我们还可以看到他们,拥有相同 thread_id 值的消息,除了都有一个主要的 thread_id 字段用于关联外,他们的字段名称不一样,可以理解为他们是不同的数据,我们可以将它们直接合成一个消息也不会有冲突。
那么怎么合并呢,自己写代码,肯定可以,我还希望他们在输出日志的时候就组合好呢,哎,有没有简单点的方法呢?
恩,我们直接可以使用Logstash 来做,Logstash 的 Elasticsearch Output 的 Upsert 功能就可以了。
代码走起
首先定义一个简单的 Pipeline:
input {
stdin{}
}
filter {
json {
source => message
}
if ![thread_id] {
uuid {
target => "thread_id"
overwrite => true
}
}
}
output {
stdout{}
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "test234"
document_id => "%{thread_id}"
doc_as_upsert => true
action => "update"
}
}
简单说明一下,使用的 stdin 终端数据输入作为测试,经过 json filter 解析成结构化的字段,然后如果文档里面没有 thread_id 这个字段会自动通过 uuid filter 生成一个,同时我们自定义了文档的唯一 id,最重要的是在 output 的地方使用参数设置为 update 操作,进行 upsert 合并插入。
调试一下!看看结果
启动 Logstash,将上面的测试数据贴到终端下面执行,查看 Elasticsearch 的索引数据:
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "test234",
"_type" : "_doc",
"_id" : "345",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"thread_id" : "345",
"code" : "1000",
"@timestamp" : "2019-12-27T15:28:23.482Z",
"message" : "message E12",
"host" : "H2-Linux"
}
},
{
"_index" : "test234",
"_type" : "_doc",
"_id" : "123",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"thread_id" : "123",
"@timestamp" : "2019-12-27T15:28:23.482Z",
"message" : """{"thread_id":"123", "a":"message A11"}""",
"a" : "message A11",
"host" : "H2-Linux"
}
},
{
"_index" : "test234",
"_type" : "_doc",
"_id" : "234",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"thread_id" : "234",
"@timestamp" : "2019-12-27T15:28:23.483Z",
"message" : """{"thread_id":"234", "f":"message D11"}""",
"b" : "message C11",
"host" : "H2-Linux",
"f" : "message D11"
}
}
]
}
}
可以看到,Elasticsearch 端的数据已经聚合好了,只有 3 条数据,相关的字段都好好的躺在里面。
小结一下吧!
通过设置 Elasticsearch 的 Output,使用 upsert 特性即可实现文档在服务端的 Join,可以用来解决一些特殊的需要将文档进行聚合的场景,使用起来非常简单方便,不过要注意的是,upsert 相比普通的 index 操作是需要产生额外的开销的,因为更新需要先拉取再索引,如果日志里面不是所有的文档需要 upsert,我们可以在 filter 阶段打上标签,在 Output 阶段通过标签进行路由选择是否进行 upsert 操作。
[1]相关链接:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-upsert
小小Tips,希望对你有帮助,记得点赞/转发+“在看”
以上是关于巧用 Logstash 实现服务端文档 Join的主要内容,如果未能解决你的问题,请参考以下文章
第十期基于 ApolloKoa 搭建 GraphQL 服务端
使用 WPF + Chrome 内核实现 在线客服系统 的复合客服端程序