巧用 Logstash 实现服务端文档 Join

Posted 以无限为有限

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了巧用 Logstash 实现服务端文档 Join相关的知识,希望对你有一定的参考价值。

   记得关注这里噢!


问题

下面的每行文本为一条日志消息,请问如何才能将它们按照字段 thread_id 进行聚合?

{"thread_id":"123", "a":"message A11"}

{"thread_id":"345", "message":"message E12", "code": "1000"}

{"thread_id":"234", "b":"message C11"}

{"thread_id":"234", "f":"message D11"}

{"thread_id":"123", "b":"message B11"}

啊,怎么办

巧用 Logstash 实现服务端文档 Join

直接采集这些日志,每一行会产生一个索引文档,他们之间是独立的,没有办法进行关联,查询的时候没有办法进行 Join,而查询的要求是希望进行这些的字段的关联分析,独立分散的文档不满足查询需求。那怎么 Join?


Elasticsearch 不支持查询端 Join(Parent/Nested不满足我们的要求),那么在索引的时候做吧。


很明显,通过分析这些日志会发现,他们的顺序是混杂一起的,没有办法通过多行的方式来进行合并 Join,那么有没有一种简单的方法来合并呢?


继续分析,我们还可以看到他们,拥有相同 thread_id 值的消息,除了都有一个主要的 thread_id 字段用于关联外,他们的字段名称不一样,可以理解为他们是不同的数据,我们可以将它们直接合成一个消息也不会有冲突。


那么怎么合并呢,自己写代码,肯定可以,我还希望他们在输出日志的时候就组合好呢,哎,有没有简单点的方法呢?


恩,我们直接可以使用Logstash 来做,Logstash 的 Elasticsearch Output 的 Upsert 功能就可以了。

巧用 Logstash 实现服务端文档 Join

代码走起

巧用 Logstash 实现服务端文档 Join

首先定义一个简单的 Pipeline:

input { 

   stdin{}

}

filter {

    json {

        source => message

    }   

        if ![thread_id] {

                uuid {

                  target    => "thread_id"

                  overwrite => true

                }

        }

}

output {

        stdout{}

        elasticsearch {

            hosts => ["http://127.0.0.1:9200"]

            index => "test234"

            document_id => "%{thread_id}"

            doc_as_upsert => true

            action => "update"

        }

}

简单说明一下,使用的 stdin 终端数据输入作为测试,经过 json filter 解析成结构化的字段,然后如果文档里面没有 thread_id 这个字段会自动通过 uuid filter 生成一个,同时我们自定义了文档的唯一 id,最重要的是在 output 的地方使用参数设置为 update 操作,进行 upsert 合并插入。

巧用 Logstash 实现服务端文档 Join

调试一下!看看结果

启动 Logstash,将上面的测试数据贴到终端下面执行,查看 Elasticsearch 的索引数据:

{

  "took" : 0,

  "timed_out" : false,

  "_shards" : {

    "total" : 1,

    "successful" : 1,

    "skipped" : 0,

    "failed" : 0

  },

  "hits" : {

    "total" : {

      "value" : 3,

      "relation" : "eq"

    },

    "max_score" : 1.0,

    "hits" : [

      {

        "_index" : "test234",

        "_type" : "_doc",

        "_id" : "345",

        "_score" : 1.0,

        "_source" : {

          "@version" : "1",

          "thread_id" : "345",

          "code" : "1000",

          "@timestamp" : "2019-12-27T15:28:23.482Z",

          "message" : "message E12",

          "host" : "H2-Linux"

        }

      },

      {

        "_index" : "test234",

        "_type" : "_doc",

        "_id" : "123",

        "_score" : 1.0,

        "_source" : {

          "@version" : "1",

          "thread_id" : "123",

          "@timestamp" : "2019-12-27T15:28:23.482Z",

          "message" : """{"thread_id":"123", "a":"message A11"}""",

          "a" : "message A11",

          "host" : "H2-Linux"

        }

      },

      {

        "_index" : "test234",

        "_type" : "_doc",

        "_id" : "234",

        "_score" : 1.0,

        "_source" : {

          "@version" : "1",

          "thread_id" : "234",

          "@timestamp" : "2019-12-27T15:28:23.483Z",

          "message" : """{"thread_id":"234", "f":"message D11"}""",

          "b" : "message C11",

          "host" : "H2-Linux",

          "f" : "message D11"

        }

      }

    ]

  }

}

可以看到,Elasticsearch 端的数据已经聚合好了,只有 3 条数据,相关的字段都好好的躺在里面。

小结一下吧!

通过设置 Elasticsearch 的 Output,使用 upsert 特性即可实现文档在服务端的 Join,可以用来解决一些特殊的需要将文档进行聚合的场景,使用起来非常简单方便,不过要注意的是,upsert 相比普通的 index 操作是需要产生额外的开销的,因为更新需要先拉取再索引,如果日志里面不是所有的文档需要 upsert,我们可以在 filter 阶段打上标签,在 Output 阶段通过标签进行路由选择是否进行 upsert 操作。


[1]相关链接:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-upsert



小小Tips,希望对你有帮助,记得点赞/转发+“在看”

以上是关于巧用 Logstash 实现服务端文档 Join的主要内容,如果未能解决你的问题,请参考以下文章

巧用ELK快速实现网站流量监控可视化

java是如何实现客服在线聊天功能的?

第十期基于 ApolloKoa 搭建 GraphQL 服务端

使用 WPF + Chrome 内核实现 在线客服系统 的复合客服端程序

开源软件介绍|Logstash 6.1.2 发布,开源服务端数据处理流程

F4 workerman 长连接绑定用户id实现一对一客服聊天