Elasticsearch+Mongo亿级别数据导入及查询实践

Posted 六点

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch+Mongo亿级别数据导入及查询实践相关的知识,希望对你有一定的参考价值。

数据方案:
  • 在Elasticsearch中通过code及time字段查询对应doc的mongo_id字段获得mongodb中的主键_id
  • 通过获得id再进入mongodb进行查询
 
1,数据情况:
  • 全部为股票及指数的分钟K线数据(股票代码区分度较高)
  • Elasticsearch及mongodb都未分片且未优化参数配置,mongo表中只有主键_id索引
  • mongodb数据量:

    

  • Elasticsearch数据量:

    

2,将数据从mongo源库导入Elasticsearch

import time
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
es = Elasticsearch()

conn = MongoClient(\'127.0.0.1\', 27017)
db = conn.kline_db
my_set = db.min_kline
x = 1
tmp = []

#此处有个坑mongo查询时由于数据量比较大时间较长需要设置游标不过期:no_cursor_timeout=True
for i in my_set.find(no_cursor_timeout=True):
    x+=1
    #每次插入100000条
    if x%100000 == 99999:
        #es批量插入
        success, _ = bulk(es, tmp, index=\'test_2\', raise_on_error=True)
        print(\'Performed %d actions\' % success)
        tmp = []
    if i[\'market\'] == \'sz\':
        market = 0
    else:
        market = 1
    #此处有个秒数时间类型及时区转换
    tmp.append({"_index":\'test_2\',"_type": \'kline\',\'_source\':{\'code\':i[\'code\'],\'market\':market,\\
                \'time\':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i[\'kline_time\']/1000 - 8*60*60))\\
                ,\'mongo_id\':str(i[\'_id\'])}})

#将最后剩余在tmp中的数据插入
if len(tmp)>0:
    success, _ = bulk(es, tmp, index=\'test_2\', raise_on_error=True)
    print(\'Performed %d actions\' % success)

3,Elasticsearch+mongo查询时间统计

import time
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from bson.objectid import ObjectId

#es连接
es = Elasticsearch()

#mongo连接
conn = MongoClient(\'127.0.0.1\', 27017)
db = conn.kline_db  #连接kline_db数据库,没有则自动创建
my_set = db.min_kline

tmp = []

#计算运行时间装饰器
def cal_run_time(func):
    def wrapper(*args,**kwargs):
        start_time = time.time()
        res = func(*args,**kwargs)
        end_time = time.time()
        print(str(func) +\'---run time--- %s\' % str(end_time-start_time))
        return res
    return wrapper

@cal_run_time
def query_in_mongo(tmp_list):
    k_list = []
    kline_data = my_set.find({\'_id\':{\'$in\':tmp_list}})
    for k in kline_data:
        k_list.append(k)
    return k_list

@cal_run_time
def query_in_es():
    #bool多条件查询 must相当于and
    body = {
        "query": {
            "bool": {
                "must": [{
                    "range": {#范围查询
                        "time": {
                            "gte": \'2017-01-10 00:00:00\',  # >=
                            "lte": \'2017-04-12 00:00:00\'  # <=
                        }
                    }
                },
                    {"terms": {# == 或  in:terms 精确查询
                        "code": [\'000002\',\'000001\']
                    }
                    }
                ]
            }

        }
    }

    #根据body条件记性查询
    scanResp = scan(es, body, scroll="10m", index="test_2",doc_type="kline", timeout="10m")

    #解析结果字典并放入tmp列表中
    for resp in scanResp:
        tmp.append(ObjectId(resp[\'_source\'][\'mongo_id\']))

    print(len(tmp))

    #--------------此处有个坑,直接使用search方法查询到的结果集中最多只有10条记录----------------
    # zz = es.search(index="test_2", doc_type="kline", body=body)
    # print(zz[\'hits\'][\'total\'])
    # for resp in zz[\'hits\'][\'hits\']:
    #     tmp.append(ObjectId(resp[\'_source\'][\'mongo_id\']))

query_in_es()

query_in_mongo(tmp)

运行结果如下:

第一行:查询的doc个数:28320

第二行:es查询所用时间:0.36s

第三行:mongo使用_id查询所用时间 :0.34s

从结果来看对于3亿多数据的查询Elasticsearch的速度还是相当不错的

※Elasticsearch主要的优势在于可以进行快速的分词模糊查询,所以股票K线这个场景并没有充分发挥其优势,至于查询效率,其实mysql,mongo等只要分库分表合理一样能够达到。

※Elasticsearch+Mongo这个架构主要针对场景:使用mongo存储海量数据,且这张表读写都很频繁。

以上是关于Elasticsearch+Mongo亿级别数据导入及查询实践的主要内容,如果未能解决你的问题,请参考以下文章

亿级 Elasticsearch 性能优化

亿级 Elasticsearch 性能优化

亿级 Elasticsearch 性能优化

[案例]如何异构一个数十亿级别的数据库

2.es与mongodb的区别

2.es与mongodb的区别