python 查询Elasticsearch的小例子

Posted **小君哥**

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 查询Elasticsearch的小例子相关的知识,希望对你有一定的参考价值。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sfo_common.agent import Agent
from sfo_common.import_common import *


class ElkLog(object):
"""
处理ELK数据类
"""
def __init__(self):
pass

def get_elk_log_json(self):
"""
通过调用elasticsearch接口查询指定索引数据,计算集群的平均响应时间
:return:
"""
try:
day = time.strftime("%Y.%m.%d",time.localtime(time.time()))
clusters = config.elk_index_name.split(\',\')
if clusters:
for cluster in clusters:
index_name="{}-swift-proxy-{}".format(cluster,day)
req_url = \'{}{}/_search?pretty\'.format(config.elk_server_url,index_name)
headers = {\'content-type\': "application/json"}
l_time = datetime.datetime.now() + datetime.timedelta(minutes=-5)
now_time = util.local2utc(datetime.datetime.now().strftime(\'%Y-%m-%d %H:%M:%S.%f\'))
now_time_5m = util.local2utc(l_time.strftime(\'%Y-%m-%d %H:%M:%S.%f\'))
body = {
"query": {
"bool":{
"must":{
"match_all":{}
},
"filter":{
"range":{
"@timestamp":{
"gte":now_time_5m,
"lte":now_time
}
}
}
}
},
"size": 10000,
"sort": {
"@timestamp": { "order": "asc" }
},
"_source": ["status", "method","client_ip","remote_ip","timestamp","request_time","@timestamp"]
}
#print req_url,body,headers
response = requests.post(req_url,data=json.dumps(body),headers=headers)
total_time=head_total_time=get_total_time=put_total_time=post_total_time=delete_total_time = 0.0
head_count=get_count=put_count=post_count=delete_count = 0

if response.status_code == 200:
tps = SfoClusterTps()
res_data = json.loads(response.text,encoding=\'UTF-8\')
if res_data and res_data.has_key(\'hits\'):
hits = res_data[\'hits\']
total = hits[\'total\']
list = hits[\'hits\']
if list and total > 0:
for obj in list:
if isinstance(obj,dict) and obj.has_key(\'_source\'):
source = obj[\'_source\']
if source.has_key(\'request_time\'):
total_time += float(source[\'request_time\'])
if source.has_key(\'method\') and str(source[\'method\']).strip().upper()==\'HEAD\':
head_count += 1
if source.has_key(\'request_time\'):
head_total_time += float(source[\'request_time\'])
if source.has_key(\'method\') and str(source[\'method\']).strip().upper()==\'GET\':
get_count += 1
if source.has_key(\'request_time\'):
get_total_time += float(source[\'request_time\'])
if source.has_key(\'method\') and str(source[\'method\']).strip().upper()==\'PUT\':
put_count += 1
if source.has_key(\'request_time\'):
put_total_time += float(source[\'request_time\'])
if source.has_key(\'method\') and str(source[\'method\']).strip().upper()==\'POST\':
post_count += 1
if source.has_key(\'request_time\'):
post_total_time += float(source[\'request_time\'])
if source.has_key(\'method\') and str(source[\'method\']).strip().upper()==\'DELETE\':
delete_count += 1
if source.has_key(\'request_time\'):
delete_total_time += float(source[\'request_time\'])
tps.guid = str(uuid.uuid1())
tps.cluster_name = cluster
if total > 0:
tps.avg_time = \'%.2f\'%(total_time/total*1000)
else:
tps.avg_time = 0
if head_count > 0:
tps.head_time = \'%.2f\'%(head_total_time/head_count*1000)
else:
tps.head_time = 0
if get_count > 0:
tps.get_time = \'%.2f\'%(get_total_time/get_count*1000)
else:
tps.get_time = 0
if put_count > 0:
tps.put_time = \'%.2f\'%(put_total_time/put_count*1000)
else:
tps.put_time = 0
if post_count > 0:
tps.post_time = \'%.2f\'%(post_total_time/post_count*1000)
else:
tps.post_time = 0
if delete_count > 0:
tps.delete_time = \'%.2f\'%(delete_total_time/delete_count*1000)
else:
tps.delete_time = 0
tps.add_time = time.strftime(\'%Y-%m-%d %H:%M:%S\', time.localtime(time.time()))
db.session.add(tps)
db.session.commit()
else:
pass
else:
pass
else:
pass
except Exception as ex:
logger.exception("get_elk_log_json function execute exception:" + str(ex))
finally:
db.session.close()
db.session.remove()

#schedule tasks
def get_elklog_json_schl(executor):
"""
起线程执行日志分析
:param executor:
:return:
"""
try:
el = ElkLog()
executor.submit(el.get_elk_log_json)
#threading.Thread(target=el.get_elk_log_json).start()
except Exception as ex:
logger.exception("get_elklog_json_schl function execute exception:" + str(ex))

class ElklogUnitAgnet(Agent):
def __init__(self, pidfile):
Agent.__init__(self, pidfile)

def run(self):
try:
sys.stdout.flush()
hostname = socket.getfqdn()
hostip = socket.gethostbyname(hostname)
logger.info("hostname is {}, ip is {}".format(hostname, hostip))
#use schedule
with ThreadPoolExecutor(config.thread_workers) as executor:
schedule.every(config.upload_refresh).seconds.do(get_elklog_json_schl,executor)
schedule.run_all(0)
while True:
schedule.run_pending()
time.sleep(0.1)
except Exception as ex:
logger.exception("elk log agent run exception:" + str(ex))

def main():
agent = ElklogUnitAgnet(config.elklog_agnet_pfile)
try:
if len(sys.argv) == 3:
if \'elklog\' == sys.argv[1]:
if \'start\' == sys.argv[2]:
agent.start()
if \'stop\' == sys.argv[2]:
agent.stop()
else:
print("Unknown command")
sys.exit(2)
else:
print("usage: %s" % (sys.argv[0],))
sys.exit(2)
except Exception as ex:
logger.exception("elk log process run exception:" + str(ex))

if __name__ == \'__main__\':
main()



###########################################################################################
更多查询方式接口:

查询一条记录
curl -H "Content-Type: application/json" -X POST \'http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty\' -d \'{"query": { "match_all": {} },"size": 1}\'
查询offset为20的记录
curl -H "Content-Type: application/json" -X POST \'http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty\' -d \'{"query": { "match": { "offset": 20 } }}\'
查询结果只返回指定的字段
curl -H "Content-Type: application/json" -X POST \'http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty\' -d \'{"query": { "match_all": {} },"_source": ["host", "message"]}\'
查询结果排序
curl -H "Content-Type: application/json" -X POST \'http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty\' -d \'{"query": { "match_all": {} },"_source": ["offset","host", "message"]},"sort": { "offset": { "order": "desc" } }\'
返回从10开始的10条记录
curl -H "Content-Type: application/json" -X POST \'http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty\' -d \'{"query": { "match_all": {} },"from": 10,"size": 10,"_source": ["offset","host", "message"]},"sort": { "offset": { "order": "desc" } }\'

集群健康状态查询:

curl \'192.168.1.1:9200/_cat/health?v\'

查询索引列表:

curl \'192.168.1.1:9200/_cat/indices?v\'

查询集群节点列表:

curl \'192.168.1.1:9200/_cat/nodes?v\'

创建索引:

curl -XPUT \'192.168.1.1:9200/test-index?pretty\'

注意:索引名中不能使用大写,否则会报错:

Could not index event to Elasticsearch.     "reason"=>"Invalid index name [iTech-swift-proxy-2018.11.08], must be lowercase",

删除索引:

curl -XDELETE \'192.168.1.1:9200/test-index?pretty\'

向索引中插入数据:

curl -XPUT \'192.168.1.1:9200/test-index/<_type>/<_id>?pretty\' -d \'{"name": "test name"}\'

 获取插入的数据:

curl -XGET \'192.168.1.1:9200/test-index/<_type>/<_id>?pretty\'

(<_type>和<_id>用实际需要插入的属性名和id值替换)

更新数据:

curl -XPOST \'192.168.1.1:9200/test-index/<_type>/<_id>/_update?pretty\' -d \'{"doc": { "name": "test2 Name" }}\'

删除数据:

curl -XDELETE \'192.168.1.1:9200/test-index/<_type>/<_id>?pretty\'

查询数据:

curl -XPOST \'192.168.1.1:9200/test-index/_search?pretty\' -d \'{"query": { "match_all": {} }}\'

如果有一台elasticsearch磁盘空间不足,将会导致index变成readonly状态,此时扩容后需要用以下命令修改其状态,恢复正常:

curl -XPUT -H "Content-Type: application/json" http://10.202.233.78:9200/_all/_settings -d \'{"index.blocks.read_only_allow_delete": null}\'

全部查询接口实例请参考:

https://www.cnblogs.com/pilihaotian/p/5830754.html

以上是关于python 查询Elasticsearch的小例子的主要内容,如果未能解决你的问题,请参考以下文章

Hive分桶小例

flask小例

使用 Python 请求查询 ElasticSearch

Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询

Elasticsearch:使用 Python elasticsearch-dsl-py 库对 Elasticsearch 进行查询

四十四 Python分布式爬虫打造搜索引擎Scrapy精讲—elasticsearch(搜索引擎)的基本查询