如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引

Posted

技术标签:

【中文标题】如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引【英文标题】:How to query an Elasticsearch index using Pyspark and Dataframes 【发布时间】:2016-07-02 17:44:02 【问题描述】:

Elasticsaerch 的文档仅涉及将完整索引加载到 Spark。

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()

如何执行查询以从 Elasticsearch 索引返回数据并使用 pyspark 将它们作为 DataFrame 加载到 Spark?

【问题讨论】:

【参考方案1】:

下面是我的做法。

一般环境设置和命令:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar

代码:

from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

q ="""
  "query": 
    "filtered": 
      "filter": 
        "exists": 
          "field": "label"
        
      ,
      "query": 
        "match_all": 
      
    
  
"""

es_read_conf = 
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "titanic/passenger",
    "es.query" : q


es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

您还可以定义数据框列。更多信息请参考Here。

希望对你有帮助!

【讨论】:

这就是我现在一直在做的事情,我希望有一种方法可以直接获取过滤后的 DataFrame 我不确定 ES-Hadoop Spark 连接器的最新 API 是否可行。 有没有办法也可以使用这个 API 将数据帧写入 elasticsearch? @ElesinOlalekanFuad 是的,有一种方法:elastic.co/guide/en/elasticsearch/hadoop/current/… 请注意,您必须从 Scala API 转换为 PySpark,但这并不难。a【参考方案2】:

我正在使用 pyspark 在 Amazon 的 EMR 集群中运行我的代码。然后,我使其工作的方式是按照以下步骤操作:

1) 将此引导操作放入集群创建中(创建 localhost elasticsearch 服务器):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb

2) 我运行这些命令来用一些数据填充弹性搜索数据库:

 curl -XPUT "http://localhost:9200/movies/movie/1" -d' 
   "title": "The Godfather",
   "director": "Francis Ford Coppola",
   "year": 1972
  '

如果您愿意,也可以运行其他 curl 命令,例如:

curl -XGET http://localhost:9200/_search?pretty=true&q='matchAll':''

3) 我使用以下参数启动了 pyspark:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar

我之前已经下载了 elasticsearch python 客户端

4) 我运行以下代码:

from pyspark import SparkConf
from pyspark.sql import SQLContext

q ="""
  "query": 
    "match_all": 
    
"""

es_read_conf = 
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "movies/movie",
    "es.query" : q


es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

然后我终于从命令中得到了成功的结果。

【讨论】:

可以直接加载DataFrames:elastic.co/guide/en/elasticsearch/hadoop/current/…【参考方案3】:

在将地理过滤数据放入 PySpark DataFrame 时,我遇到了类似的问题。我将 elasticsearch-spark-20_2.11-5.2.2.jar 与 Spark 2.1.1 版和 ES 5.2 版一起使用。通过在创建 DataFrame 时将我的查询指定为选项,我能够将数据加载到 DataFrame 中

我的地理查询

q ="""
  "query": 
        "bool" : 
            "must" : 
                "match_all" : 
            ,
           "filter" : 
                "geo_distance" : 
                    "distance" : "100km",
                    "location" : 
                        "lat" : 35.825,
                        "lon" : -87.99
                    
                
            
        
    
"""

我使用以下命令将数据加载到DataFrame中

spark_df = spark.read.format("es").option("es.query", q).load("index_name")

这里有详细的 API:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-data-sources

【讨论】:

是的!见elastic.co/guide/en/elasticsearch/hadoop/current/…

以上是关于如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引的主要内容,如果未能解决你的问题,请参考以下文章

是否可以使用 pyspark 过滤 Spark DataFrames 以返回列值在列表中的所有行?

PySpark DataFrames - 使用不同类型的列之间的比较进行过滤

使用 Python 的 reduce() 加入多个 PySpark DataFrames

Pyspark DataFrames 中的嵌套 SELECT 查询

使用 pyspark 在循环中附加 Spark DataFrames 的有效方法

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe