通过 DataFrame 将数据导入到 ElasticSearch。
Posted youngxuebo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过 DataFrame 将数据导入到 ElasticSearch。相关的知识,希望对你有一定的参考价值。
样本类:
/**
* 封装 MongoConfig配置
* @param uri MongoDB uri
* @param db 数据库名
*/
case class MongoConfig(val uri:String,val db:String)
/**
* ElasticSearch 配置对象
* @param httpHosts ES通过http连上去,主机名+端口
* @param transportHost ES集群内部通信端口
* @param index ES库名
* @param clusterName 集群名字
*/
case class ESConfig(val httpHosts:String,
val transportHost:String,
val index:String,
/**
* 将 数据保存到ES中
*/
//优化,引入表中缓存
moviesDF.cache()
tagsDF.cache()
import org.apache.spark.sql.functions._
//将tagsDF 对 movies 中的mid做聚合操作
//agg 对其做的什么操作,concat_ws() 拼接函数(和hive中一样),collect_set()拼接哪一列
val tagCollectDF = tagsDF.groupBy("mid").agg(concat_ws("|",collect_set($"tag"))
.as("tags"))
//将 Tags表中的tag(用户对电影的标签),合并到Movies中,产生新的Movie数据集
val esMovieAddTagDF = moviesDF.join(tagCollectDF,Seq("mid","mid"),"left")
.select("mid","name","descri","timelong","issue",
"shoot","language","genres","actors","director","tags")
//将数据写入到ES中
saveData2ES(esMovieAddTagDF)
//去除缓存
moviesDF.unpersist()
tagsDF.unpersist()
spark.close()
private def saveData2ES(esMovieAddTagDF: DataFrame)(implicit esConfig: ESConfig): Unit = {
//连接ES配置
val settings = Settings.builder().put("cluster.name",esConfig.clusterName).build()
//连接ES客户端
val esClient = new PreBuiltTransportClient(settings)
//params += "es.transportHosts" -> "192.168.109.141:9300,192.168.109.142:9300,192.168.109.143:9300"
esConfig.transportHost.split(",")
.foreach{
case ES_HOST_PORT_REGEX(host:String,port:String) =>
esClient.addTransportAddress(
new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))
}
//判断如果Index是否存在,若存在,则删除
if(esClient.admin().indices().exists(
new IndicesExistsRequest(esConfig.index)).actionGet().isExists){
//删除
esClient.admin.indices.prepareDelete(esConfig.index).get
}
//创建Index
esClient.admin().indices().create(new CreateIndexRequest(esConfig.index)).actionGet()
val movieOptions = Map(("es.node",esConfig.httpHosts),
("es.http.timeout","100ms"),
("es.mapping.id","mid"))
val movieTypeName = s"$esConfig.index/movie"
esMovieAddTagDF.write.options(movieOptions)
.mode("overwrite")
.format("org.elasticsearch.spark.sql")
.save(movieTypeName)
}
以上是关于通过 DataFrame 将数据导入到 ElasticSearch。的主要内容,如果未能解决你的问题,请参考以下文章
将多个csv文件导入pandas并合并到一个DataFrame中