elasticsearch 将数据转换为数组

Posted

技术标签:

【中文标题】elasticsearch 将数据转换为数组【英文标题】:elasticsearch transform data to array 【发布时间】:2020-12-24 09:05:30 【问题描述】:

我想用 ES 来计算用户留存率:

1、事件日志到默认索引 2、转化为中间索引:以实体为中心的数据,按acc分组 3、使用aggs过滤器(或adjacency_matrix)计算每天的相交结果。

问题出在第 2 步:如何生成漂亮的变换

输入事件日志:

POST _bulk
"index": "_index": "test.u1"
"acc":1001, "event":"create", "timestamp":"2020-08-01 09:00"
"index": "_index": "test.u1"
"acc":1001, "event":"login", "timestamp":"2020-08-01 10:00"
"index": "_index": "test.u1"
"acc":1001, "event":"login", "timestamp":"2020-08-02 10:00"
"index": "_index": "test.u1"
"acc":1001, "event":"login", "timestamp":"2020-08-03 10:00"
"index": "_index": "test.u1"
"acc":1002, "event":"create", "timestamp":"2020-08-01 10:00"
"index": "_index": "test.u1"
"acc":1002, "event":"login", "timestamp":"2020-08-02 10:00"
"index": "_index": "test.u1"
"acc":1002, "event":"login", "timestamp":"2020-08-02 11:00"
"index": "_index": "test.u1"
"acc":1003, "event":"create", "timestamp":"2020-08-01 10:00"
"index": "_index": "test.u1"
"acc":1004, "event":"create", "timestamp":"2020-08-02 10:00"
"index": "_index": "test.u1"
"acc":1004, "event":"login", "timestamp":"2020-08-02 10:00"
"index": "_index": "test.u1"
"acc":1004, "event":"login", "timestamp":"2020-08-03 10:00"

期望中间索引:

"acc":1001, "create":"08-01", "login":[08-01, 08-02, 08-03]
"acc":1002, "create":"08-01", "login":[08-02]
"acc":1003, "create":"08-01", "login":[]
"acc":1004, "create":"08-02", "login":[08-02, 08-03]

如何生成"login"数组? 或者任何更好的设计都是受欢迎的。

【问题讨论】:

你在使用 x-pack 转换模块吗? elastic.co/guide/en/elasticsearch/reference/current/… @SahilGupta 是的。 “创建”日期很简单:aggs.filter("event=login").min() 中间数据很简单。我不太明白你的第三步,也许你可以用这个?elastic.co/guide/en/elasticsearch/reference/current/… 【参考方案1】:

aggs.scripted_metric

完成
PUT _transform/tr-acc2-ar2

  "source": 
    "index": [
      "mhlog2-*"
    ]
  ,
  "pivot": 
    "group_by": 
      "msg.#account_id": 
        "histogram": 
          "field": "msg.#account_id",
          "interval": "1"
        
      
    ,
    "aggregations": 
      "create": 
        "filter": 
          "term": 
            "msg.#event_name.keyword": "createRole"
          
        ,
        "aggs": 
          "time": 
            "min": 
              "field": "@timestamp"
            
          
        
      ,
      "login": 
        "filter": 
          "term": 
            "msg.#event_name.keyword": "login"
          
        ,
        "aggs": 
          "days": 
            "scripted_metric": 
              "init_script": "state.days=[:];",
              "map_script": "state.days[doc['@timestamp'].value.toString('yyyy-MM-dd')]=1; ",
              "combine_script": "return state",
              "reduce_script": "def days = [:]; def array =[]; for (s in states)  for (d in s.days.keySet())  days[d]=1;    for (d in days.keySet())  array.add(d); return array; "
            
          
        
      
    
  ,
  "dest": 
    "index": "idx.tr.acc2.ar2"
  ,
  "sync": 
    "time": 
      "field": "@timestamp",
      "delay": "60s"
    
  

gen 中间索引:

_id : AAAAAAAA
_index : acc.array  
_score : 0
_type : _doc    
create.time : Aug 18, 2020 @ 11:17:43.000   
login.days : 2020-08-18T00:00:00.000Z, 2020-08-19T00:00:00.000Z, 2020-08-20T00:00:00.000Z   
msg.#account_id : 12333212323

最后,通过 KQL 过滤器,2020-08-18 用户保留到 2020-08-19 很容易:

create.time: 2020-08-18 AND login.days: 2020-08-19

【讨论】:

doc['@timestamp'].value.toString('yyyy-MM-dd') 导致Day-break的时区问题。

以上是关于elasticsearch 将数据转换为数组的主要内容,如果未能解决你的问题,请参考以下文章

Docker,Debezium 不会将数据从 mssql 流式传输到 elasticsearch

Spring数据Elasticsearch中可配置的索引名称

将 SQL 查询转换为 Elasticsearch dsl 以进行数据可视化

数据采集 ETL 工具 Elasticsearch-datatran v6.3.9 发布

在这种情况下,如何将 mongoDB 的地理点数据转换为 Elasticsearch 7.x?

Elasticsearch 安装