elasticsaerch的restfulHttp查询封装,scala+fastjson+scalaj,可封装进spark,flink任务使用
Posted Edge535
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了elasticsaerch的restfulHttp查询封装,scala+fastjson+scalaj,可封装进spark,flink任务使用相关的知识,希望对你有一定的参考价值。
相关依赖
<dependency>
<groupId>org.scalaj</groupId>
<artifactId>scalaj-http_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
复制可用代码
import com.alibaba.fastjson.JSON, JSONArray, JSONObject
import scalaj.http.Http
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
def esHttpSearchFunc(
addr:String, //http://ip:端口
table:String,
mustQuerySet:Seq[(String, String, AnyRef)] = Seq(),
mustNotQuerySet:Seq[(String, String, AnyRef)] = Seq(),
shouldQuerySet:Seq[(String, String, AnyRef)] = Seq(),
fields:Seq[String] = Seq(),
sortSet:Seq[(String, String)] = Seq(),
resLimit:Int = 0 // 默认0,意味取全部结果
) =
def esHttpSearchParamFormatFunc:String = try
val Array(must,must_not,should) = Array("must", "must_not", "should")
val query = JSON.parseObject(f""""query":"bool":"$must":[],"$must_not":[],"$should":[]""")
val bool = query.getJSONObject("query").getJSONObject("bool")
var querySet:Seq[(String, (String, String, AnyRef))] = Seq()
if(mustQuerySet.isEmpty && mustNotQuerySet.isEmpty && shouldQuerySet.isEmpty)
val match_all = new JSONObject()
match_all.put("match_all", new JSONObject())
bool.getJSONArray(must).add(match_all)
else
if(mustQuerySet.nonEmpty) querySet = querySet.union( mustQuerySet.map((must,_)) )
if(mustNotQuerySet.nonEmpty) querySet = querySet.union( mustNotQuerySet.map((must_not,_)) )
if(shouldQuerySet.nonEmpty) querySet = querySet.union( shouldQuerySet.map((should,_)) )
if(querySet.nonEmpty) querySet.foreach case ( qt,(ct, k, v) ) =>
val termsList = new JSONArray()
val rangeTuple = new JSONObject()
val withField = new JSONObject()
val condition = new JSONObject()
def termsFunc():Unit =
v.asInstanceOf[List[String]].foreach(termsList.add)
withField.put(k, termsList)
condition.put(ct, withField)
def rangeFunc():Unit =
val minAndMax = v.asInstanceOf[((String,AnyRef), (String,AnyRef))]
rangeTuple.put(minAndMax._1._1.toString, minAndMax._1._2)
rangeTuple.put(minAndMax._2._1.toString, minAndMax._2._2)
withField.put(k, rangeTuple)
condition.put(ct, withField)
def missingFunc():Unit =
withField.put("field", k)
condition.put("exists", withField)
def otherFunc():Unit =
withField.put(k, v)
condition.put(ct, withField)
def operator(whichFunc:()=>Unit, which:String) =
whichFunc()
bool.getJSONArray(which).add(condition)
qt match
case `must` =>
ct match
case "terms" => operator(termsFunc, must)
case "range" => operator(rangeFunc, must)
case "missing" => operator(missingFunc, must_not)
case _ => operator(otherFunc, must)
case `must_not` =>
ct match
case "terms" => operator(termsFunc, must_not)
case "range" => operator(rangeFunc, must_not)
case "missing" => operator(missingFunc, must)
case _ => operator(otherFunc, must_not)
case `should` =>
ct match
case "terms" => operator(termsFunc, should)
case "range" => operator(rangeFunc, should)
case "missing" => operator(missingFunc, should)
case _ => operator(otherFunc, should)
case _ => null
if(fields.nonEmpty)
val source = new JSONArray()
fields.foreach(source.add)
query.put("_source", source)
if(sortSet.nonEmpty)
query.put("sort", new JSONArray())
sortSet.foreachcase (k,v)=>
val item = new JSONObject()
item.put(k.toString, v)
query.getJSONArray("sort").add(item)
if(resLimit != 0) query.put("size", resLimit.abs)
query.toJSONString
catch
case e:Exception => e.printStackTrace(); null
val searchQuery = esHttpSearchParamFormatFunc
val index = addr + "/" + table + "/" + "_search"
val heads = Seq( ("Content-Type", "application/json") )
def toJSON = JSON.parseObject(
Http(index).timeout(10000,10000).headers(heads)
.postData(searchQuery).asString.body
)
def readToJson = toJSON.getJSONObject("hits").getJSONArray("hits").toList
try
if(searchQuery != null)
readToJson
.map(res=>JSON.parseObject(res.toString).getJSONObject("_source"))
else null
catch
case e:Exception=>e.printStackTrace(); null
案例
// 格式化参数样例
esHttpSearchParamFormatFunc(
mustQuerySet = Seq(
("terms", "must_terms_field", List("a","b","c") ),
("missing", "must_missing_field", null ),
("range", "must_range_field", (("gte",0),("lte",99)) ),
("match", "must_match_field", "must_match_field_value" ),
("term", "must_term_field", "must_term_field_value" ),
("wildcard", "must_wildcard_field", "must_wildcard_field_value" ),
("prefix", "must_prefix_field", "must_prefix_field_value" )
)
)
// 封装方法调用
esHttpSearchFunc(
addr = "http://127.0.0.1:9200",
table = "table_name",
mustQuerySet = Seq(
("terms", "must_terms_field", List("a","b","c") ),
("missing", "must_missing_field", null ),
("range", "must_range_field", (("gte",0),("lte",99)) ),
("match", "must_match_field", "must_match_field_value" ),
("term", "must_term_field", "must_term_field_value" ),
("wildcard", "must_wildcard_field", "must_wildcard_field_value" ),
("prefix", "must_prefix_field", "must_prefix_field_value" )
),
fields = Seq("field1","field2"),
sortSet = Seq(("field1","asc"),("field2","desc")),
resLimit = 5
)
以上是关于elasticsaerch的restfulHttp查询封装,scala+fastjson+scalaj,可封装进spark,flink任务使用的主要内容,如果未能解决你的问题,请参考以下文章
spring security + oauth2 + reactjs + restful http客户端