Elasticsearch+Mongo亿级别数据导入及查询实践
Posted 六点
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch+Mongo亿级别数据导入及查询实践相关的知识,希望对你有一定的参考价值。
数据方案:
- 在Elasticsearch中通过code及time字段查询对应doc的mongo_id字段获得mongodb中的主键_id
- 通过获得id再进入mongodb进行查询
1,数据情况:
- 全部为股票及指数的分钟K线数据(股票代码区分度较高)
- Elasticsearch及mongodb都未分片且未优化参数配置,mongo表中只有主键_id索引
- mongodb数据量:
- Elasticsearch数据量:
2,将数据从mongo源库导入Elasticsearch
import time from pymongo import MongoClient from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk es = Elasticsearch() conn = MongoClient(\'127.0.0.1\', 27017) db = conn.kline_db my_set = db.min_kline x = 1 tmp = [] #此处有个坑mongo查询时由于数据量比较大时间较长需要设置游标不过期:no_cursor_timeout=True for i in my_set.find(no_cursor_timeout=True): x+=1 #每次插入100000条 if x%100000 == 99999: #es批量插入 success, _ = bulk(es, tmp, index=\'test_2\', raise_on_error=True) print(\'Performed %d actions\' % success) tmp = [] if i[\'market\'] == \'sz\': market = 0 else: market = 1 #此处有个秒数时间类型及时区转换 tmp.append({"_index":\'test_2\',"_type": \'kline\',\'_source\':{\'code\':i[\'code\'],\'market\':market,\\ \'time\':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i[\'kline_time\']/1000 - 8*60*60))\\ ,\'mongo_id\':str(i[\'_id\'])}}) #将最后剩余在tmp中的数据插入 if len(tmp)>0: success, _ = bulk(es, tmp, index=\'test_2\', raise_on_error=True) print(\'Performed %d actions\' % success)
3,Elasticsearch+mongo查询时间统计
import time from pymongo import MongoClient from elasticsearch import Elasticsearch from elasticsearch.helpers import scan from bson.objectid import ObjectId #es连接 es = Elasticsearch() #mongo连接 conn = MongoClient(\'127.0.0.1\', 27017) db = conn.kline_db #连接kline_db数据库,没有则自动创建 my_set = db.min_kline tmp = [] #计算运行时间装饰器 def cal_run_time(func): def wrapper(*args,**kwargs): start_time = time.time() res = func(*args,**kwargs) end_time = time.time() print(str(func) +\'---run time--- %s\' % str(end_time-start_time)) return res return wrapper @cal_run_time def query_in_mongo(tmp_list): k_list = [] kline_data = my_set.find({\'_id\':{\'$in\':tmp_list}}) for k in kline_data: k_list.append(k) return k_list @cal_run_time def query_in_es(): #bool多条件查询 must相当于and body = { "query": { "bool": { "must": [{ "range": {#范围查询 "time": { "gte": \'2017-01-10 00:00:00\', # >= "lte": \'2017-04-12 00:00:00\' # <= } } }, {"terms": {# == 或 in:terms 精确查询 "code": [\'000002\',\'000001\'] } } ] } } } #根据body条件记性查询 scanResp = scan(es, body, scroll="10m", index="test_2",doc_type="kline", timeout="10m") #解析结果字典并放入tmp列表中 for resp in scanResp: tmp.append(ObjectId(resp[\'_source\'][\'mongo_id\'])) print(len(tmp)) #--------------此处有个坑,直接使用search方法查询到的结果集中最多只有10条记录---------------- # zz = es.search(index="test_2", doc_type="kline", body=body) # print(zz[\'hits\'][\'total\']) # for resp in zz[\'hits\'][\'hits\']: # tmp.append(ObjectId(resp[\'_source\'][\'mongo_id\'])) query_in_es() query_in_mongo(tmp)
运行结果如下:
第一行:查询的doc个数:28320
第二行:es查询所用时间:0.36s
第三行:mongo使用_id查询所用时间 :0.34s
从结果来看对于3亿多数据的查询Elasticsearch的速度还是相当不错的
※Elasticsearch主要的优势在于可以进行快速的分词模糊查询,所以股票K线这个场景并没有充分发挥其优势,至于查询效率,其实mysql,mongo等只要分库分表合理一样能够达到。
※Elasticsearch+Mongo这个架构主要针对场景:使用mongo存储海量数据,且这张表读写都很频繁。
以上是关于Elasticsearch+Mongo亿级别数据导入及查询实践的主要内容,如果未能解决你的问题,请参考以下文章