Spark + ElasticSearch 返回 RDD[(String, Map[String, Any])]。我怎样才能操纵任何?

Posted

技术标签:

【中文标题】Spark + ElasticSearch 返回 RDD[(String, Map[String, Any])]。我怎样才能操纵任何?【英文标题】:Spark + ElasticSearch returns RDD[(String, Map[String, Any])]. How can I manipulate Any? 【发布时间】:2015-07-01 23:15:22 【问题描述】:

Scala 2.10.4

弹性搜索 1.4.4

火花 1.3

Elasticsearch-hadoop 2.1.0.Beta3

按照docs,我可以从 ES 文档创建一个 rdd,如下所示:

val data sc.edRDD('indexName/type')

返回一个类型为元组的RDD

(String, Map[String, Any])

其中 String 是文档 ID,Map 表示 ES 文档,字段名:String -> 字段值:Any。但是因为值类型是 Any,所以我无法操作该数据。例如。如果我愿意

data.first()._2.get("someField")

字段值在 ES 中是 double,但在 Spark 中是 Any。如果我打印这个值,它就像

Buffer(61.15)

尝试使用 .asInstanceOf[Double]、.toDouble 或它们的组合进行强制转换会产生如下异常:

java.lang.ClassCastException: scala.collection.convert.Wrappers$JListWrapper cannot be cast to java.lang.Double

在这个简单的案例中,操作数据的正确方法是什么?为了让事情变得更加严峻,我真正想要的数据存在于嵌套文档中,这意味着原始 Map 中的 Any 类型值本身就是 Map[String, Any] 并且这些 Any 类型值有时是数字,有时是元组列表,具体取决于在钥匙上。

【问题讨论】:

如果您碰巧将原始 _source 与您的文档一起存储,那么您可以从每个 Map 中获取 _source,它始终是一个字符串,然后自己将其解析为一个真实的全类型模型类。您可以在map 操作中执行此操作,将您从(String, Map[String,Any]) 带到T,其中T 是您的模型类类型。只是一个想法,但它可能会起作用 我可以确定 _source 字符串中的字段将始终按指定的(看起来是字母顺序)顺序出现吗? 我不知道这些字段的顺序保证,但是如果你使用 json 解析器,比如 Jackson,它也不应该关心字段的顺序 【参考方案1】:

您看到的问题是因为返回的类型 is a buffer - 不是 Double。

如果将其强制转换为 Buffer,则不会出现类强制转换异常。例如,在 ES 中对以下文档进行索引:


    "myDouble" : 4.20,
    "myString" : "test",
    "myList" : [1.2, 93.2, 42.3]

然后从spark查询:

val documents = sc.esRDD("test/test", "?q=*")
val doc = documents.first()._2

println(doc.get("myDouble").get.asInstanceOf[Double]) # 4.2
println(doc.get("myString").get.asInstanceOf[String]) # test

import scala.collection.mutable.Buffer

println(doc.get("myList").get.asInstanceOf[Buffer[Double]]) 
# Buffer(1.2, 93.2, 42.3)

println(doc.get("myList").get.asInstanceOf[Buffer[Double]].mkString(","))
# 1.2,93.2,42.3

【讨论】:

非常好的答案。我只想补充一个方便的方法,比如 getFromDoc[T](doc, key) 在这里构建可能有用。

以上是关于Spark + ElasticSearch 返回 RDD[(String, Map[String, Any])]。我怎样才能操纵任何?的主要内容,如果未能解决你的问题,请参考以下文章

Spark-Cassandra 与 Spark-Elasticsearch

elasticsearch spark hadoop integration

Spark和Elasticsearch交互

Spark hive to ES Elasticsearch

尝试在 presto ( spark ) 上运行 Elasticsearch 查询时连接被拒绝

如何使用python将Spark数据写入ElasticSearch