使用 Spark 中的复杂过滤从 elasticsearch 中获取 esJsonRDD

Posted

技术标签:

【中文标题】使用 Spark 中的复杂过滤从 elasticsearch 中获取 esJsonRDD【英文标题】:Fetching esJsonRDD from elasticsearch with complex filtering in Spark 【发布时间】:2017-06-13 16:48:56 【问题描述】:

我目前正在基于单行弹性查询的Spark Job 过滤中获取elasticsearch RDD(示例):

val elasticRdds = sparkContext.esJsonRDD(esIndex, s"?default_operator=AND&q=director.name:DAVID + \n movie.name:SEVEN")

现在,如果我们的搜索查询变得复杂,例如:


    "query": 
        "filtered": 
            "query": 
                "query_string": 
                    "default_operator": "AND",
                    "query": "director.name:DAVID + \n movie.name:SEVEN"
                
            ,
            "filter": 
                "nested": 
                    "path": "movieStatus.boxoffice.status",
                    "query": 
                        "bool": 
                            "must": [
                                
                                    "match": 
                                        "movieStatus.boxoffice.status.rating": "A"
                                    
                                ,
                                
                                    "match": 
                                        "movieStatus.boxoffice.status.oscar": "false"
                                    
                                
                            ]
                        
                    
                
           
        
    

我仍然可以将该查询转换为内联弹性查询以将其与 esJsonRDD 一起使用吗?或者无论如何,上述查询仍然可以按原样esJsonRDD一起使用? 如果没有,在 Spark 中获取此类 RDD 的更好方法是什么?

因为 esJsonRDD 似乎只接受内联(一行)弹性查询。

【问题讨论】:

【参考方案1】:

使用三引号:

val query = """
"query": 
    "filtered": 
        "query": 
            "query_string": 
                "default_operator": "AND",
                "query": "director.name:DAVID + \n movie.name:SEVEN"
            
        ,
        "filter": 
            "nested": 
                "path": "movieStatus.boxoffice.status",
                "query": 
                    "bool": 
                        "must": [
                            
                                "match": 
                                    "movieStatus.boxoffice.status.rating": "A"
                                
                            ,
                            
                                "match": 
                                    "movieStatus.boxoffice.status.oscar": "false"
                                
                            
                        ]
                    
                
            
        
     
  
"""

val elasticRdds = sparkContext.esJsonRDD(esIndex, query)

【讨论】:

以上是关于使用 Spark 中的复杂过滤从 elasticsearch 中获取 esJsonRDD的主要内容,如果未能解决你的问题,请参考以下文章

无法过滤存储在 spark 2.2.0 数据框中的 CSV 列

推荐系统-协同过滤在Spark中的实现

从 pyspark 中的 HDFS 读取 70gb bson 文件然后将其索引到 Elastic 时出错

根据 CSV 记录从 Spark 数据帧中过滤一些数据

从 VPC 中的 Elastic Beanstalk 实例访问 RDS

过滤计数等于输入文件 rdd Spark 的列