es修正query

Posted NAVYSUMMER

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了es修正query相关的知识,希望对你有一定的参考价值。

def es_mapping2dict(mapping):
    mapping_dict = dict()

    if isinstance(mapping, dict):
        if "properties" in mapping:
            for k, v in mapping.get("properties").items():
                if isinstance(v, dict):
                    if "properties" not in v:
                        if "fields" not in v and "type" in v:
                            field_type = v.get("type")
                            mapping_dict[k] = field_type
                        elif "fields" in v and "type" in v:
                            field_type = v.get("type")
                            mapping_dict[k] = field_type
                            if isinstance(v.get("fields"), dict):
                                for fk, fv in v.get("fields").items():
                                    if "type" in fv:
                                        mapping_dict[f"k.fk"] = fv.get("type")

                    else:
                        mapping_dict[k] = es_mapping2dict(v)

    return mapping_dict


def data2single_dict(source, parent_name: str = ""):
    result = 
    if isinstance(source, dict):
        for k, v in source.items():
            column_name = f"parent_name.k" if parent_name else k
            if isinstance(v, dict):
                result.update(data2single_dict(v, column_name))
            elif isinstance(v, list) or isinstance(v, tuple):
                for i, vv in enumerate(v):
                    cn = f"column_name.i"
                    result.update(data2single_dict(vv, cn))
            else:
                result[column_name] = v
    elif isinstance(source, list) or isinstance(source, tuple):
        for i, v in enumerate(source):
            column_name = f"parent_name.i" if parent_name else f"i"
            if isinstance(v, dict):
                result.update(data2single_dict(v, column_name))
            elif isinstance(v, list) or isinstance(v, tuple):
                for vv in v:
                    result.update(data2single_dict(vv, column_name))
            else:
                result[column_name] = v
    else:
        result[parent_name] = source
    return result


def amend_query_keyword(query, amend_dict: dict):
    if isinstance(query, dict):
        for key, value in query.items():
            if key in ["term", "terms", \'range\', "wildcard"]:
                new_value = dict()
                for k, v in value.items():
                    if not k.endswith(".keyword"):
                        field_type = amend_dict.get(f"k.keyword")
                        if field_type == "keyword":
                            k = f"k.keyword"
                    else:
                        field_type = amend_dict.get(k)
                        if field_type is None:
                            k = k[:-len(".keyword")]
                    new_value.update(k: v)
                query[key] = new_value
            elif key in ["match", "match_phrase", "match_phrase_prefix"]:
                new_value = dict()
                for k, v in value.items():
                    if k.endswith(".keyword"):
                        k = k[:-len(".keyword")]
                    new_value.update(k: v)
                query[key] = new_value
            elif key == "multi_match":
                new_fields = list()
                fields = query[key].get("fields", [])
                for field in fields:
                    if field.endswith(".keyword"):
                        field = field[:-len(".keyword")]
                    new_fields.append(field)
                query[key] = 
                    "query": query[key].get("query"),
                    "fields": new_fields
                
            elif key == "exists":
                field = query[key].get("field")
                if not field.endswith(".keyword"):
                    field_type = amend_dict.get(f"field.keyword")
                    if field_type == "keyword":
                        field = f"field.keyword"
                else:
                    field_type = amend_dict.get(field)
                    if field_type is None:
                        field = field[:-len(".keyword")]
                query[key] = "field": field
            else:
                query[key] = amend_query_keyword(value, amend_dict)
    elif isinstance(query, list):
        query = [amend_query_keyword(q, amend_dict) for q in query]
    return query


def amend_query(query, mapping):
    dict_mapping = es_mapping2dict(mapping)
    single_dict = data2single_dict(dict_mapping)
    new_query = amend_query_keyword(query, single_dict)
    return new_query


mapping = 
    "properties": 
        "basic": 
            "properties": 
                "establish_date": 
                    "type": "date"
                ,

            
        ,
        "name": 
            "type": "text",
            "fields": 
                "keyword": 
                    "type": "keyword",
                    "ignore_above": 256
                
            
        
    


query = 
    "query": 
        "bool": 
            "must": [
                "wildcard": 
                    "name": 
                        "value": "*北京*"
                    
                , 
                    "term": 
                        "name": 
                            "value": "北京百度科技有限公司"
                        
                    
                , 
                    "terms": 
                        "name": ["北京百度科技有限公司"]
                    
                ,
                
                    "match": 
                        "name.keyword": "北京"
                    
                ,
                
                    "range": 
                        "basic.establish_date.keyword": 
                            "gte": "2001-01-01",
                            "lte": "2023-12-31"
                        
                    
                , 
                    "exists": 
                        "field": "name"
                    
                
            ]
        
    ,
    "track_total_hits": True


print(amend_query(query, mapping))

  

es xxx_by_query

xxx_by_query包括_delete_by_query和_update_by_query,下面分开讲

_delete_by_query

相当于sql中的delete from a where 。。。删除满足条件的文档

在Dev Tools执行的格式是

POST teacher/_doc/_delete_by_query
{
	"query": {
		"range": {
			"updated_date": {
				"from": 1534007400000,
				"to": 1534007700000
			}
		}
	}
}

值得注意的是,Java High Level REST Client截止到6.3.2版本(elasticsearch-rest-high-level-client-6.3.2.jar),尽管有DeleteByQueryRequest类,可以通过DeleteByQueryRequest(SearchRequest search)构造器构造一个实例,但是RestHighLevelClient实例没有合适的方法来操作DeleteByQueryRequest实例(如RestHighLevelClient实例可以用index(XxxRequest xxxRequest, Header... headers)实例方法来操作索引的文档,可以用indices()实例方法获取IndicesClient实例,进而通过其各种实例方法来操作索引,但是没有方法的入参是DeleteByQueryRequest实例)。

想在java代码中实现delete_by_query功能,有2种替代方法:

1.先用searchRequest筛选出符合条件的文档的id集合,然后根据id用deleteRequest来删除。

2.调用RestHighLevelClient实例的getLowLevelClient()方法获取org.elasticsearch.client.RestClient实例,即low-level client,然后调用其performRequest(String method, String endpoint, Map<String, String> params, HttpEntity entity, Header... headers)方法,其中method是"POST",endpoint是"/index/doc/_delete_by_query",代码示例如下:

    public static void main(String[] args) throws Exception {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("192.168.56.11", 9200, "http"),
                        new HttpHost("192.168.56.12", 9201, "http")
                )
        );
        try {
            HttpEntity entity = new NStringEntity(generateQueryString(), ContentType.APPLICATION_JSON);
            Response response = client.getLowLevelClient().performRequest("POST", "/teacher/_doc/_delete_by_query", Collections.emptyMap(), entity);
            Map responseMap = JSON.parseObject(IOUtils.toString(response.getEntity().getContent(), "UTF-8"));
            LOGGER.info(responseMap);
        } catch (Exception e) {
            LOGGER.error("client delete_by_query exception", e);
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }

    public static String generateQueryString() {
        IndexRequest indexRequest = new IndexRequest();
        XContentBuilder builder;
        try {
            long begin = 1534015700000L;
            long end = 1534016700000L;
            builder = JsonXContent.contentBuilder()
                    .startObject()
                    .startObject("query")
                    .startObject("range")
                    .startObject("updated_date").field("from", begin).field("to", end)
                    .endObject()
                    .endObject()
                    .endObject()
                    .endObject();
            indexRequest.source(builder);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return indexRequest.source().utf8ToString();
    }

 

以上是关于es修正query的主要内容,如果未能解决你的问题,请参考以下文章

ES6学习一

ES: update by query

ES Query DSL

es xxx_by_query

ES 23 - 检索和过滤的区别 (query vs. filter)

ES(elasticsearch) query DSL 查询语法