查询ElasticSearch 大量数据场景迭代器模式

Posted 我是廖志伟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了查询ElasticSearch 大量数据场景迭代器模式相关的知识,希望对你有一定的参考价值。

文章目录

创建design-demo项目

项目代码:https://gitee.com/java_wxid/java_wxid/tree/master/demo/design-demo
项目结构如下(示例):

创建EsController

代码如下(示例):

package com.example.designdemo.controller;

import com.example.designdemo.service.EsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EsController 
    @Autowired
    private EsService esService;

    @PostMapping("es")
    public Boolean query(@RequestParam String query, Long fetchSize) 

        return esService.query(query, fetchSize);
    



创建EsService

代码如下(示例):

package com.example.designdemo.service;

/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
public interface EsService 

    Boolean query(String query, Long fetchSize);


创建EsServiceimpl

代码如下(示例):

package com.example.designdemo.service.impl;

import com.example.designdemo.esquery.EsQueryProcessor;
import com.example.designdemo.service.EsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.stream.Stream;

@Service
public class EsServiceimpl implements EsService 

    @Autowired
    private EsQueryProcessor esQueryProcessor;
    
    public Boolean query(String query, Long fetchSize) 
        Stream<Map<String, Object>> mapStream = esQueryProcessor
                .scrollEsStream(query, fetchSize);
        mapStream.forEach(x -> System.out.println(x));
        return true;
    


创建EsQueryProcessor

代码如下(示例):

package com.example.designdemo.esquery;

import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
 * @Author: zhiwei Liao
 * @Date: 2022/9/29 21:29
 * @Description:
 */
@Component
public class EsQueryProcessor 
    //1. 我们要用stream 返回 为了节省内存
    public Stream<Map<String, Object>> scrollEsStream(String query, Long fetchSize) 
        return StreamSupport.stream(Spliterators
                .spliteratorUnknownSize(new ScrollIterator(query, fetchSize), 0), false);
    


    //2. 我们要 迭代器
    private class ScrollIterator implements Iterator<Map<String, Object>> 
        private String scrollId;
        private List<String> columns;
        Iterator<Map<String, Object>> iterator;
        RestTemplate restTemplate = new RestTemplate(); // 真是项目中使用resttemplate的时候
        //一定是进行过我们的 bean 配置注入的。这里边直接用new关键字是为了访问我们的es 接口。

        //构造函数进行第一次查询,并且初始化我们后续需要使用的 columns 和 iterator 和 scroll
        public ScrollIterator(String query, Long fetchSize) 
            EsSqlResult esSqlResult = restTemplate.postForObject("http://localhost:9200/_sql?format=json",
                    new EsSqlQuery(query, fetchSize), EsSqlResult.class);//第一次访问的结果出来了
            this.scrollId = esSqlResult.getCursor();
            this.columns = esSqlResult.getColumns()
                    .stream().map(x->x.get("name"))
                    .collect(Collectors.toList());
            this.iterator = convert(columns, esSqlResult).iterator();
        

        // hasNext 根据 是否 scrollId 为null进行后续的 第二次,第三次,,,的访问,直到 scrollId 为null
        @Override
        public boolean hasNext() 
            return iterator.hasNext() || scrollNext();
        
        private boolean scrollNext() 
            if(iterator == null || this.scrollId == null) 
                return false;
            
            EsSqlResult esSqlResult = restTemplate.postForObject("http://localhost:9200/_sql?format=json",
                    new EsSqlQuery(this.scrollId), EsSqlResult.class);//第二次访问的结果出来了
            this.scrollId = esSqlResult.getCursor();
            this.iterator = convert(columns, esSqlResult).iterator();
            return iterator.hasNext();
        

        @Override
        public Map<String, Object> next() 
            return iterator.next();
        
    



    //3. 返回结果传统一点 List<map>
    private List<Map<String, Object>> convert(List<String> columns, EsSqlResult esSqlResult) 
        List<Map<String, Object>> results = new ArrayList<>();
        for(List<Object> row : esSqlResult.getRows()) 
            Map<String, Object> map = new HashMap<>();
            for(int i = 0; i < columns.size(); i++) 
                map.put(columns.get(i), row.get(i));
            
            results.add(map);
        
        return results;
    


创建EsSqlQuery

代码如下(示例):

package com.example.designdemo.esquery;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
 * @Author: zhiwei Liao
 * @Date: 2022/9/29 21:29
 * @Description:
 */
@JsonIgnoreProperties
public class EsSqlQuery 
    private String query;
    private Long fetchSize;
    private String cursor;

    public EsSqlQuery(String cursor) 
        this.cursor = cursor;
    

    public EsSqlQuery(String query, Long fetchSize) 
        this.query = query;
        this.fetchSize = fetchSize;
    

    public String getQuery() 
        return query;
    

    public void setQuery(String query) 
        this.query = query;
    

    public Long getFetchSize() 
        return fetchSize;
    

    public void setFetchSize(Long fetchSize) 
        this.fetchSize = fetchSize;
    

    public String getCursor() 
        return cursor;
    

    public void setCursor(String cursor) 
        this.cursor = cursor;
    


创建EsSqlResult

代码如下(示例):

package com.example.designdemo.esquery;

import java.util.List;
import java.util.Map;
/**
 * @Author: zhiwei Liao
 * @Date: 2022/9/29 21:29
 * @Description:
 */
public class EsSqlResult 
    private List<Map<String, String>> columns;
    private List<List<Object>> rows;
    private String cursor;

    public List<Map<String, String>> getColumns() 
        return columns;
    

    public void setColumns(List<Map<String, String>> columns) 
        this.columns = columns;
    

    public List<List<Object>> getRows() 
        return rows;
    

    public void setRows(List<List<Object>> rows) 
        this.rows = rows;
    

    public String getCursor() 
        return cursor;
    

    public void setCursor(String cursor) 
        this.cursor = cursor;
    


以上是关于查询ElasticSearch 大量数据场景迭代器模式的主要内容,如果未能解决你的问题,请参考以下文章

查询ElasticSearch 大量数据场景迭代器模式

基础Elasticsearch 基础

开源数据计算引擎,实现媲美ElasticSearch的高性能并发查询

开源数据计算引擎,实现媲美ElasticSearch的高性能并发查询

开源数据计算引擎,实现媲美ElasticSearch的高性能并发查询

开源数据计算引擎,实现媲美ElasticSearch的高性能并发查询