通过 DataFrame 将数据导入到 ElasticSearch。

Posted youngxuebo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过 DataFrame 将数据导入到 ElasticSearch。相关的知识,希望对你有一定的参考价值。

样本类:

/**
  * 封装 MongoConfig配置
  * @param uri  MongoDB uri
  * @param db  数据库名
  */
case class MongoConfig(val uri:String,val db:String)

/**
  * ElasticSearch 配置对象
  * @param httpHosts  ES通过http连上去,主机名+端口
  * @param transportHost ES集群内部通信端口
  * @param index ES库名
  * @param clusterName 集群名字
  */
case class ESConfig(val httpHosts:String,
                    val transportHost:String,
                    val index:String,
        
    /**
      * 将 数据保存到ES中
      */
    //优化,引入表中缓存
    moviesDF.cache()
    tagsDF.cache()

    import org.apache.spark.sql.functions._
    //将tagsDF 对 movies 中的mid做聚合操作
    //agg 对其做的什么操作,concat_ws() 拼接函数(和hive中一样),collect_set()拼接哪一列
    val tagCollectDF = tagsDF.groupBy("mid").agg(concat_ws("|",collect_set($"tag"))
      .as("tags"))

    //将 Tags表中的tag(用户对电影的标签),合并到Movies中,产生新的Movie数据集
    val esMovieAddTagDF = moviesDF.join(tagCollectDF,Seq("mid","mid"),"left")
      .select("mid","name","descri","timelong","issue",
        "shoot","language","genres","actors","director","tags")

    //将数据写入到ES中
    saveData2ES(esMovieAddTagDF)

    //去除缓存
    moviesDF.unpersist()
    tagsDF.unpersist()

    spark.close()
private def saveData2ES(esMovieAddTagDF: DataFrame)(implicit esConfig: ESConfig): Unit = {

    //连接ES配置
    val settings = Settings.builder().put("cluster.name",esConfig.clusterName).build()

    //连接ES客户端
    val esClient = new PreBuiltTransportClient(settings)

    //params += "es.transportHosts" -> "192.168.109.141:9300,192.168.109.142:9300,192.168.109.143:9300"
    esConfig.transportHost.split(",")
      .foreach{
        case ES_HOST_PORT_REGEX(host:String,port:String) =>
          esClient.addTransportAddress(
            new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))
      }

    //判断如果Index是否存在,若存在,则删除
    if(esClient.admin().indices().exists(
      new IndicesExistsRequest(esConfig.index)).actionGet().isExists){
      //删除
      esClient.admin.indices.prepareDelete(esConfig.index).get

    }

    //创建Index
    esClient.admin().indices().create(new CreateIndexRequest(esConfig.index)).actionGet()


    val movieOptions = Map(("es.node",esConfig.httpHosts),
      ("es.http.timeout","100ms"),
      ("es.mapping.id","mid"))

    val movieTypeName = s"$esConfig.index/movie"

    esMovieAddTagDF.write.options(movieOptions)
      .mode("overwrite")
      .format("org.elasticsearch.spark.sql")
      .save(movieTypeName)

  }

以上是关于通过 DataFrame 将数据导入到 ElasticSearch。的主要内容,如果未能解决你的问题,请参考以下文章

如何将dataframe导入到excel且不覆盖原有内容

将多个csv文件导入pandas并合并到一个DataFrame中

Mysql 数据同步到 Elasticsearch

将 Kaggle csv 从下载 url 导入到 pandas DataFrame

怎么通过nrows参数导入数据

将带有列表的JSON dict导入Dataframe [重复]