使用 ES Hadoop 连接器在 Elastic Search 中保存 JavaRDD

Posted

技术标签:

【中文标题】使用 ES Hadoop 连接器在 Elastic Search 中保存 JavaRDD【英文标题】:Saving the JavaRDD in Elastic Search using ES Hadoop connector 【发布时间】:2017-05-11 20:10:03 【问题描述】:

目前正在进行一个转换项目,我需要将数据提供给 Oracle 的弹性搜索。所以我的工作是这样的

1. Sqoop - From oracle
2. Java Spark - Dataframe Joins then saving them into elastic search repo's

我的弹性文档看起来像


Field 1: Value
Field 2: value
Field 3: Value
Field 4: [               -- Array of Maps
   
    Name: Value
    Age: Value
   ,
    Name: Value
    Age: Value
   
]
Field 5:                -- Maps
   Code :Value
   Key : Value


所以想知道,如何为上述结构形成一个javaRDD。

我已经编码直到数据帧加入并卡住了,无法从那里继续。 所以我希望我的数据采用标准化形式

我的火花代码

Dataframe esDF = df.select(
df.col("Field1") , df.col("Field2") ,df.col("Field3") 
 ,df.col("Name") ,df.col("Age") ,
  df.col("Code"),df.col("Key")
)

请帮忙。

【问题讨论】:

【参考方案1】:

几个选项:

1 - 在 dataFrame 本身中使用 saveToES 方法。 (旧版本可能不支持,适用于 elasticsearch-spark-20_2.11-5.1.1.jar

import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql.functions._
import org.elasticsearch.spark.sql._

dataFrame.saveToEs("<index>/<type>",Map(("es.nodes" -> <ip:port>"))

2 - 创建一个案例类并使用 RDD[] 方法保存。 (也适用于旧版本)

import org.elasticsearch.spark._
case class ESDoc(...)
val rdd = df.map( row => EsDoc(..))
rdd.saveToEs("<index>/<type>",Map(("es.nodes" -> <ip:port>"))

3 - 使用旧版本的 scala (< 2.11),您将遇到 case class 中 22 个字段的限制。请注意,您可以使用 Map 而不是案例类

import org.elasticsearch.spark._
val rdd  = df.map( row => Map(<key>:<value>...) )
rdd.saveToEs("<index>/<type>",Map(("es.nodes" -> <ip:port>")) // saves RDD[Map<K,V>] 

对于上述所有方法,您可能希望将 es.batch.write.retry.count 传递给适当的值,如果您有另一种控制 EMR 生命周期的方式(确保它不会运行永远)

   val esOptions = Map("es.nodes" -> <host>:<port>, "es.batch.write.retry.count" -> "-1")

【讨论】:

我怀疑下面的方法是否会形成 ES 想要的结构, JavaRDD esRDD = df.javaRDD.map(y -> class.builder() .field1(y.getString(0 )) .field2(y.getString(1)) .field3(y.getString(2)) .field4(ImmutableList.of (ImmutableMap.of ("Name",y.getString(3),"Age",y. getString(4)) )) field5(ImmutableMap.of("Code",y.getString(5),"Key", y.getString(6))) ).build() JavaEsSpark.saveToEs(esRDD, "index/类型”);这将为 field4 创建一个只有一个 ArrayList 的 es doc,对于下一个列表,它将创建第二个文档。但我的案例单个文件应该有地图列表

以上是关于使用 ES Hadoop 连接器在 Elastic Search 中保存 JavaRDD的主要内容,如果未能解决你的问题,请参考以下文章

ES加密,Elastic Search 5.6.1X添加X-pack后,修改连接方式(RestClient 和 客户端模式)

是否可以在 ElasticSearch 中使用 presto 或 Hive (ElasticSearch-Hadoop) 的任何 ES 连接器进行 JOIN 操作?

从Hive导入数据到ES

Docker搭建ES集群配置

Elastic Stack 8.0 再次创建enrollment token

Elastic_exporter 无法连接启用 https 的 uri