Python 导数 Elasticsearch 元数据到CSV
Posted illusioned
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 导数 Elasticsearch 元数据到CSV相关的知识,希望对你有一定的参考价值。
由于上了Elastic Stack 的快车,很多日志和监控数据等都往ELK 丢了;
不知道你是不是存在同样的问题,辛辛苦苦kibana 写条件查询数据出来,发现kibana无法导出数据报表的问题;
在此,借助网上找的python demo,给大伙解释下Python 导数 Elasticsearch 元数据到CSV 问题;
首先,先了解导数的查询语句怎么写:
注1:关于ES 数据查询问题,建议Kibana 自带的 Dev Tools 进行数据查询验证;
示范说明:
##查询 nginxlog 状态码为200,最近5分钟内的数据(即:"now-5M",注意!!此处时间基准为@timetamp,数据入ES的时间,并非日志时间; )
GET /nginxlog-2019.11.22/_search { "query": { "match" : { "statuscode" : "200" } }, "aggs":{ "@timestamp": { "filter": { "range": { "sold": { "from": "now-5M" } } } } }, "size": 2, "from": 0, "_source": [ "type", "verb", "upstream_response_time","statuscode" ] }
解释说明:
① "_source" 指定输出字段;以SQL语法解释来说,类似于 select "type", "verb", "upstream_response_time","statuscode" from nginxlog 的意思;
② "size" 限制输出;类似于mysql 的LIMIT 操作;(注:此处为了演示只输出2条记录)
③ "from" 开始输出位置;类似于mysql 的LIMIT 操作; Limit 0,2
注:官方说明,from定义了目标数据的偏移值,size定义当前返回的事件数目;(为了更好的说明,上述抽象的表达了下概念)
##查询返回结果
{ "took": 17, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 1448153, "max_score": 1, "hits": [ { "_index": "nginxlog-2019.11.22", "_type": "log", "_id": "AW6QrSYLcN5cQJSYWJEL", "_score": 1, "_source": { "statuscode": 200, "verb": "POST", "upstream_response_time": 0.014, "type": "log" } }, { "_index": "nginxlog-2019.11.22", "_type": "log", "_id": "AW6QrSYLcN5cQJSYWJEM", "_score": 1, "_source": { "statuscode": 200, "verb": "POST", "upstream_response_time": 0.069, "type": "log" } } ] }, "aggregations": { "@timestamp": { "doc_count": 0 } } }
## 同上,以下不做详细的说明,只简单解析相应的用法,具体规范细则,请参考官方文档; 时间区域查询: ##写法①
GET /nginxlog-2019.11.22/_search { "query": { "match" : { "statuscode" : "200" } }, "aggs":{ "@timestamp": { "filter": { "range" : { "@timestamp" : { "gte" : "now-1m", "lt" : "now" } } } } }, "size": 100, "from": 0, "_source": [ "log_timestamp","statuscode", "@timestamp"] }
##写法② GET /nginxlog-2019.11.22/_search { "query": { "range" : { "@timestamp" : { "gte" : "now-1m", "lt" : "now" } }, "match" : { "statuscode" : "200" } }, "size": 100, "_source": [ "first_name", "last_name","log_timestamp","@timestamp","statuscode"] }
说明:
eq 相等
ne、neq 不相等
gt 大于
lt 小于
gte、ge 大于等于
lte、le 小于等于
##写法③ 组合查询: GET /nginxlog-2019.01.24/_search { "query": { "bool": { "must": { "match": { "statuscode" : "200" }}, "must": {"range" : {"@timestamp" : {"gte" : "now-1m","lt" : "now"}} } } }, "size": 100, "_source": [ "first_name", "last_name","log_timestamp","@timestamp","statuscode"] }
##熟悉了上面的查询方法后,我们就开始写我们的Python 代码了;
# -*- coding: utf-8 -*- """ Created on Thu Jun 7 10:08:02 2018 @author: illusioned """ from elasticsearch import Elasticsearch from elasticsearch import helpers def GetVlue(): es = Elasticsearch(["192.168.10.121:9200"],timeout=99999) #注意此处timeout,不专门设置,查询百分百超时报错; ###发现木有,查询语句非常相似; query ={ "query": { "bool": { "must": [ {"match": { "actaion" : "visit" }}, {"match": { "survey_id" : "471101"}}, {"match": { "ref" : "http*"}} ] } }, "size": 100, "_source": [ "@timestamp","action","survey_id","ref","user_id"] } ##是否非常像SQL语句: ## select "@timestamp","action","survey_id","ref","user_id" from jsonlog-2019.11.* ## where action="visit" and survey_id="471101" and ref="http*" scanResp = helpers.scan(client=es, query=query, scroll="3m", index=\'jsonlog-2019.11.*\', doc_type=\'log\', timeout="10m") for k in scanResp: yield k def write_file(k): ##指向读取需要导出的数据字段,注意编码问题; with open(\'China_news.csv\',\'ab\') as f: k=dict(k) f.write(str(k[\'_source\'][\'@timestamp\']).encode(\'gbk\')) f.write(b\',\') f.write(str(k[\'_source\'][\'action\']).encode(\'gbk\')) f.write(b\',\') f.write(str(k[\'_source\'][\'survey_id\']).encode(\'gbk\')) f.write(b\',\') f.write(str(k[\'_source\'][\'ref\']).encode(\'gbk\')) f.write(b\',\') f.write(str(k[\'_source\'][\'user_id\']).encode(\'gbk\')) f.write(b\'\\n\') if __name__=="__main__": list1 = GetVlue() for index,k in enumerate(list1,1): write_file(k) print(\'正在导出\'+str(index)+\'条数据\')
=====================================================
分享个ElasticSearch 有个非常有意思的玩法;
熟悉数据库的朋友应该知道,SQL 的update 语句要带条件的,不可更新自己,正常情况下会报错;
但ElasticSearch却实实在在的可以这么玩,如下查询更新;
POST /zipkin:span-2019-03-05/span/_update_by_query?conflicts=proceed { "script": { "source": "ctx._source[\'parentId\']=\'bf98782dbd7e9304\';", }, "query": { "term": { {"parentId":352006966a095911} } } } ##根据条件查询出"parentId"=352006966a095911,然后更新为"bf98782dbd7e9304";
以上是关于Python 导数 Elasticsearch 元数据到CSV的主要内容,如果未能解决你的问题,请参考以下文章
错误 [内部] 为 docker.elastic.co/elasticsearch/elasticsearch:7.12.0 加载元数据