elasticsaerch的restfulHttp查询封装,scala+fastjson+scalaj,可封装进spark,flink任务使用

Posted Edge535

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了elasticsaerch的restfulHttp查询封装,scala+fastjson+scalaj,可封装进spark,flink任务使用相关的知识,希望对你有一定的参考价值。

相关依赖

<dependency>
      <groupId>org.scalaj</groupId>
      <artifactId>scalaj-http_2.11</artifactId>
      <version>2.4.2</version>
</dependency>
<dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.75</version>
</dependency>

复制可用代码

import com.alibaba.fastjson.JSON, JSONArray, JSONObject
import scalaj.http.Http
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

def esHttpSearchFunc(
                        addr:String, //http://ip:端口
                        table:String, 
                        mustQuerySet:Seq[(String, String, AnyRef)] = Seq(),
                        mustNotQuerySet:Seq[(String, String, AnyRef)] = Seq(),
                        shouldQuerySet:Seq[(String, String, AnyRef)] = Seq(),
                        fields:Seq[String] = Seq(),
                        sortSet:Seq[(String, String)] = Seq(),
                        resLimit:Int = 0 // 默认0,意味取全部结果
                      ) = 
    def esHttpSearchParamFormatFunc:String = try 
      val Array(must,must_not,should) = Array("must", "must_not", "should")
      val query = JSON.parseObject(f""""query":"bool":"$must":[],"$must_not":[],"$should":[]""")
      val bool = query.getJSONObject("query").getJSONObject("bool")
      var querySet:Seq[(String, (String, String, AnyRef))] = Seq()
      if(mustQuerySet.isEmpty && mustNotQuerySet.isEmpty && shouldQuerySet.isEmpty) 
        val match_all = new JSONObject()
        match_all.put("match_all", new JSONObject())
        bool.getJSONArray(must).add(match_all)
      else
        if(mustQuerySet.nonEmpty) querySet = querySet.union( mustQuerySet.map((must,_)) )
        if(mustNotQuerySet.nonEmpty) querySet = querySet.union( mustNotQuerySet.map((must_not,_)) )
        if(shouldQuerySet.nonEmpty) querySet = querySet.union( shouldQuerySet.map((should,_)) )
      
      if(querySet.nonEmpty) querySet.foreach case ( qt,(ct, k, v) ) =>
        val termsList = new JSONArray()
        val rangeTuple = new JSONObject()
        val withField = new JSONObject()
        val condition = new JSONObject()
        def termsFunc():Unit = 
          v.asInstanceOf[List[String]].foreach(termsList.add)
          withField.put(k, termsList)
          condition.put(ct, withField)
        
        def rangeFunc():Unit = 
          val minAndMax = v.asInstanceOf[((String,AnyRef), (String,AnyRef))]
          rangeTuple.put(minAndMax._1._1.toString, minAndMax._1._2)
          rangeTuple.put(minAndMax._2._1.toString, minAndMax._2._2)
          withField.put(k, rangeTuple)
          condition.put(ct, withField)
        
        def missingFunc():Unit = 
          withField.put("field", k)
          condition.put("exists", withField)
        
        def otherFunc():Unit = 
          withField.put(k, v)
          condition.put(ct, withField)
        
        def operator(whichFunc:()=>Unit, which:String) = 
          whichFunc()
          bool.getJSONArray(which).add(condition)
        
        qt match 
          case `must` =>
            ct match 
              case "terms" => operator(termsFunc, must)
              case "range" => operator(rangeFunc, must)
              case "missing" => operator(missingFunc, must_not)
              case _ => operator(otherFunc, must)
            
          case `must_not` =>
            ct match 
              case "terms" => operator(termsFunc, must_not)
              case "range" => operator(rangeFunc, must_not)
              case "missing" => operator(missingFunc, must)
              case _ => operator(otherFunc, must_not)
            
          case `should` =>
            ct match 
              case "terms" => operator(termsFunc, should)
              case "range" => operator(rangeFunc, should)
              case "missing" => operator(missingFunc, should)
              case _ => operator(otherFunc, should)
            
          case _ => null
        
      
      if(fields.nonEmpty) 
        val source = new JSONArray()
        fields.foreach(source.add)
        query.put("_source", source)
      
      if(sortSet.nonEmpty) 
        query.put("sort", new JSONArray())
        sortSet.foreachcase (k,v)=>
          val item = new JSONObject()
          item.put(k.toString, v)
          query.getJSONArray("sort").add(item)
        
      
      if(resLimit != 0) query.put("size", resLimit.abs)
      query.toJSONString
     catch 
      case e:Exception => e.printStackTrace(); null
    
    val searchQuery = esHttpSearchParamFormatFunc
    val index = addr + "/" + table + "/" + "_search"
    val heads = Seq( ("Content-Type", "application/json") )
    def toJSON = JSON.parseObject(
      Http(index).timeout(10000,10000).headers(heads)
        .postData(searchQuery).asString.body
    )
    def readToJson = toJSON.getJSONObject("hits").getJSONArray("hits").toList
    try 
      if(searchQuery != null) 
        readToJson
          .map(res=>JSON.parseObject(res.toString).getJSONObject("_source"))
       else null
     catch 
      case e:Exception=>e.printStackTrace(); null
    
  

案例

// 格式化参数样例
esHttpSearchParamFormatFunc(
      mustQuerySet = Seq(
        ("terms", "must_terms_field", List("a","b","c") ),
        ("missing", "must_missing_field", null ),
        ("range", "must_range_field", (("gte",0),("lte",99)) ),
        ("match", "must_match_field", "must_match_field_value" ),
        ("term", "must_term_field", "must_term_field_value" ),
        ("wildcard", "must_wildcard_field", "must_wildcard_field_value" ),
        ("prefix", "must_prefix_field", "must_prefix_field_value" )
      )
    )
// 封装方法调用
esHttpSearchFunc(
	  addr = "http://127.0.0.1:9200",
	  table = "table_name",
      mustQuerySet = Seq(
        ("terms", "must_terms_field", List("a","b","c") ),
        ("missing", "must_missing_field", null ),
        ("range", "must_range_field", (("gte",0),("lte",99)) ),
        ("match", "must_match_field", "must_match_field_value" ),
        ("term", "must_term_field", "must_term_field_value" ),
        ("wildcard", "must_wildcard_field", "must_wildcard_field_value" ),
        ("prefix", "must_prefix_field", "must_prefix_field_value" )
      ),
      fields = Seq("field1","field2"),
      sortSet = Seq(("field1","asc"),("field2","desc")),
      resLimit = 5
    )

以上是关于elasticsaerch的restfulHttp查询封装,scala+fastjson+scalaj,可封装进spark,flink任务使用的主要内容,如果未能解决你的问题,请参考以下文章

spring security + oauth2 + reactjs + restful http客户端

如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引

Restful概念

RESTful简单介绍(入门)

什么是RESTful?

Compute API 关键概念 详解