ElasticSearch改造研报查询实践

Posted 六点

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch改造研报查询实践相关的知识,希望对你有一定的参考价值。

背景:

  1,系统简介:通过人工解读研报然后获取并录入研报分类及摘要等信息,系统通过摘要等信息来获得该研报的URI

  2,现有实现:老系统使用MSSQL存储摘要等信息,并将不同的关键字分解为不同字段来提供搜索查询

  3,存在问题:

    -查询操作繁琐,死板:例如要查某个机构,标题含有周报的研报,现有系统需要勾选相应字段再输入条件

    -查询速度缓慢,近千万级别数据响应时间4-5s

  4,改进:使用es优化,添加多个关键字模糊查询(非长文本数据,因此未使用_socre进行评分查询)

    -例如:输入“国泰君安 周报”就可查询到所有相关的国泰君安的周报

 

1,新建Index

curl -X PUT localhost:9200/src_test_1 -H Content-Type: application/json -d {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
  "mappings": {
    "doc_test": {
      "properties": {
        "title": {#研报综合标题
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_max_word"
        },
        "author": {#作者
          "type": "text",
          "analyzer": "ik_max_word",
          "search_analyzer": "ik_max_word"
        },
        "institution": {#机构
            "type": "text",
            "analyzer": "ik_max_word",
            "search_analyzer": "ik_max_word"
        },
          "industry": {#行业
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
          },
          "grade": {#评级
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
          },
          "doc_type": {#研报分类
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
          },
         "time": {#发布时间
          "type": "date" ,
          "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
         },
          "doc_uri": {#地址
           "type": "text",
            "index":false
         },
          "doc_size": {#文件大小
           "type": "integer",
            "index":false
         },
          "market": {#市场
          "type": "byte"
         }
      }
    }
  }
}

 

2,数据导入(CSV分批)

import pandas as pd
import numpy as np
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
es = Elasticsearch()

data_will_insert = []
x = 1

# #使用pandas读取csv数据;如果出现乱码加:encoding = "ISO-8859-1"
src_data = pd.read_csv(ResearchReportEx.csv)

for index,i in src_data.iterrows():
    x+=1
    #每次插入100000条
    if x%100000 == 99999:
        #es批量插入
        success, _ = bulk(es, data_will_insert, index=src_test_1, raise_on_error=True)
        print(Performed %d actions % success)
        data_will_insert = []

    #判断市场
    if i[ExchangeType] == CN:
        market = 0
    elif i[ExchangeType] == HK:
        market = 1
    elif i[ExchangeType] == World:
        market = 2
    else:
        market = 99

    data_will_insert.append({"_index":src_test_1,"_type": doc_test,_source:
                {
                title:i[Title],
                author:i[AuthorName],
                time:i[CreateTime]+:00,
                institution:i[InstituteNameCN],
                doc_type:i[KindName] if i[Kind2Name] is np.NaN else i[KindName]+|%s % i[Kind2Name],
                industry:‘‘ if i[IndustryName] is np.NaN else i[IndustryName],
                grade:‘‘ if i[GradeName] is np.NaN else i[GradeName],
                doc_uri:i[FileURL],
                doc_size:i[Size],
                market:market
                }
                })

#将最后剩余在list中的数据插入
if len(data_will_insert)>0:
    success, _ = bulk(es, data_will_insert, index=src_test_1, raise_on_error=True)
    print(Performed %d actions % success)

 

3,查询

import time
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

# es连接
es = Elasticsearch()


# 计算运行时间装饰器
def cal_run_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, **kwargs)
        end_time = time.time()
        print(str(func) + ---run time--- %s % str(end_time - start_time))
        return res

    return wrapper


@cal_run_time
def query_in_es():
    body = {
        "query": {
            "bool": {
                "must": [
                    {
                        "multi_match": {
                            "query": "国泰 报告",
                            "type": "cross_fields",#跨字段匹配
                            "fields": ["title", "institution","grade"
                                       "doc_type","author","industry"],#在这6个字段中进行查找
                            "operator": "and" 
                        }#此查询条件等于:query中的关键都在fields中所有字段拼接成的字符中
                    },
                    {
                        "range": {
                            "time": {
                                "gte": 2018-02-01#默认查询限制时间
                            }
                        }
                    }
                ],
            }
        }
    }

    # 根据body条件查询
    scanResp = scan(es, body, scroll="10m", index="src_test_1", doc_type="doc_test", timeout="10m")
    row_num = 0

    for resp in scanResp:
        print(resp[_source])
        row_num += 1

    print(row_num)


query_in_es()

 ※测试结果速度相当快:多关键字查询只需零点几秒

以上是关于ElasticSearch改造研报查询实践的主要内容,如果未能解决你的问题,请参考以下文章

ElasticSearch学习问题记录——Invalid shift value in prefixCoded bytes (is encoded value really an INT?)(代码片段

顶级架构 -PB级数据实时查询,滴滴Elasticsearch多集群架构实践

顶级架构 -PB级数据实时查询,滴滴Elasticsearch多集群架构实践

贷前系统ElasticSearch实践总结

elasticsearch代码片段,及工具类SearchEsUtil.java

Elasticsearch笔记九之优化