Python 导数 Elasticsearch 元数据到CSV

Posted illusioned

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 导数 Elasticsearch 元数据到CSV相关的知识,希望对你有一定的参考价值。

  由于上了Elastic Stack 的快车,很多日志和监控数据等都往ELK 丢了;
  不知道你是不是存在同样的问题,辛辛苦苦kibana 写条件查询数据出来,发现kibana无法导出数据报表的问题; 
  在此,借助网上找的python demo,给大伙解释下Python 导数 Elasticsearch 元数据到CSV 问题;
 
首先,先了解导数的查询语句怎么写:
注1:关于ES 数据查询问题,建议Kibana 自带的 Dev Tools 进行数据查询验证;

示范说明:

##查询 nginxlog 状态码为200,最近5分钟内的数据(即:"now-5M",注意!!此处时间基准为@timetamp,数据入ES的时间,并非日志时间; )
GET /nginxlog-2019.11.22/_search { "query": { "match" : { "statuscode" : "200" } }, "aggs":{ "@timestamp": { "filter": { "range": { "sold": { "from": "now-5M" } } } } }, "size": 2, "from": 0, "_source": [ "type", "verb", "upstream_response_time","statuscode" ] }

解释说明:
① "_source" 指定输出字段;以SQL语法解释来说,类似于 select "type", "verb", "upstream_response_time","statuscode"  from nginxlog 的意思;
"size" 限制输出;类似于mysql 的LIMIT 操作;(注:此处为了演示只输出2条记录)
③ "from" 开始输出位置;类似于mysql 的LIMIT 操作; Limit 0,2
 注:官方说明,from定义了目标数据的偏移值,size定义当前返回的事件数目;(为了更好的说明,上述抽象的表达了下概念)
##查询返回结果
{ "took": 17, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 1448153, "max_score": 1, "hits": [ { "_index": "nginxlog-2019.11.22", "_type": "log", "_id": "AW6QrSYLcN5cQJSYWJEL", "_score": 1, "_source": { "statuscode": 200, "verb": "POST", "upstream_response_time": 0.014, "type": "log" } }, { "_index": "nginxlog-2019.11.22", "_type": "log", "_id": "AW6QrSYLcN5cQJSYWJEM", "_score": 1, "_source": { "statuscode": 200, "verb": "POST", "upstream_response_time": 0.069, "type": "log" } } ] }, "aggregations": { "@timestamp": { "doc_count": 0 } } }
## 同上,以下不做详细的说明,只简单解析相应的用法,具体规范细则,请参考官方文档;
时间区域查询:
##写法①
GET
/nginxlog-2019.11.22/_search { "query": { "match" : { "statuscode" : "200" } }, "aggs":{ "@timestamp": { "filter": { "range" : { "@timestamp" : { "gte" : "now-1m", "lt" : "now" } } } } }, "size": 100, "from": 0, "_source": [ "log_timestamp","statuscode", "@timestamp"] }
##写法② GET
/nginxlog-2019.11.22/_search { "query": { "range" : { "@timestamp" : { "gte" : "now-1m", "lt" : "now" } }, "match" : { "statuscode" : "200" } }, "size": 100, "_source": [ "first_name", "last_name","log_timestamp","@timestamp","statuscode"] }
说明:
eq       相等
ne、neq   不相等
gt      大于
lt      小于
gte、ge    大于等于
lte、le   小于等于 
##写法③ 组合查询: GET
/nginxlog-2019.01.24/_search { "query": { "bool": { "must": { "match": { "statuscode" : "200" }}, "must": {"range" : {"@timestamp" : {"gte" : "now-1m","lt" : "now"}} } } }, "size": 100, "_source": [ "first_name", "last_name","log_timestamp","@timestamp","statuscode"] }

 

##熟悉了上面的查询方法后,我们就开始写我们的Python 代码了;

 # -*- coding: utf-8 -*-
 """
 Created on Thu Jun  7 10:08:02 2018
 @author: illusioned
 """
 from elasticsearch import Elasticsearch
 from elasticsearch import helpers
  
 def GetVlue():
     es = Elasticsearch(["192.168.10.121:9200"],timeout=99999) #注意此处timeout,不专门设置,查询百分百超时报错;
    ###发现木有,查询语句非常相似;
     query ={
           "query": {
             "bool": {
               "must": [ {"match": { "actaion" : "visit"  }},
                         {"match": { "survey_id" : "471101"}},
                         {"match": { "ref" : "http*"}}
                 ]
             }
           },
             "size": 100,
             "_source": [ "@timestamp","action","survey_id","ref","user_id"]
         }
##是否非常像SQL语句:
##       select  "@timestamp","action","survey_id","ref","user_id"  from jsonlog-2019.11.*
##           where action="visit" and survey_id="471101" and ref="http*"
    scanResp = helpers.scan(client=es, query=query, scroll="3m", index=\'jsonlog-2019.11.*\', doc_type=\'log\', timeout="10m") 
    for k in scanResp: 
        yield k 

def write_file(k):
##指向读取需要导出的数据字段,注意编码问题;
     with open(\'China_news.csv\',\'ab\') as f:
         k=dict(k)
         f.write(str(k[\'_source\'][\'@timestamp\']).encode(\'gbk\'))
         f.write(b\',\')
         f.write(str(k[\'_source\'][\'action\']).encode(\'gbk\'))
         f.write(b\',\')
         f.write(str(k[\'_source\'][\'survey_id\']).encode(\'gbk\'))
         f.write(b\',\')
         f.write(str(k[\'_source\'][\'ref\']).encode(\'gbk\'))
         f.write(b\',\')
         f.write(str(k[\'_source\'][\'user_id\']).encode(\'gbk\'))
         f.write(b\'\\n\')
  
 if __name__=="__main__":
     list1 = GetVlue()
     for index,k in enumerate(list1,1):
         write_file(k)
  
         print(\'正在导出\'+str(index)+\'条数据\')
 

=====================================================

分享个ElasticSearch 有个非常有意思的玩法;

熟悉数据库的朋友应该知道,SQL 的update 语句要带条件的,不可更新自己,正常情况下会报错;

但ElasticSearch却实实在在的可以这么玩,如下查询更新;

POST /zipkin:span-2019-03-05/span/_update_by_query?conflicts=proceed
{
  "script": {
    "source": "ctx._source[\'parentId\']=\'bf98782dbd7e9304\';",
  },
  "query": {
    "term": {
        {"parentId":352006966a095911}
    }
  }
}
##根据条件查询出"parentId"=352006966a095911,然后更新为"bf98782dbd7e9304";

 

 

以上是关于Python 导数 Elasticsearch 元数据到CSV的主要内容,如果未能解决你的问题,请参考以下文章

微分积分三角函数数学公式大全

ElasticSearch实战(十八)-索引字段元数据

错误 [内部] 为 docker.elastic.co/elasticsearch/elasticsearch:7.12.0 加载元数据

三角函数python的导数

Elasticsearch学习笔记索引元数据和集群元数据

用于计算光流空间导数的 Python 库