如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引
Posted
技术标签:
【中文标题】如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引【英文标题】:How to query an Elasticsearch index using Pyspark and Dataframes 【发布时间】:2016-07-02 17:44:02 【问题描述】:Elasticsaerch 的文档仅涉及将完整索引加载到 Spark。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()
如何执行查询以从 Elasticsearch 索引返回数据并使用 pyspark 将它们作为 DataFrame 加载到 Spark?
【问题讨论】:
【参考方案1】:下面是我的做法。
一般环境设置和命令:
export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2
./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar
代码:
from pyspark import SparkConf
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
q ="""
"query":
"filtered":
"filter":
"exists":
"field": "label"
,
"query":
"match_all":
"""
es_read_conf =
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "titanic/passenger",
"es.query" : q
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
sqlContext.createDataFrame(es_rdd).collect()
您还可以定义数据框列。更多信息请参考Here。
希望对你有帮助!
【讨论】:
这就是我现在一直在做的事情,我希望有一种方法可以直接获取过滤后的 DataFrame 我不确定 ES-Hadoop Spark 连接器的最新 API 是否可行。 有没有办法也可以使用这个 API 将数据帧写入 elasticsearch? @ElesinOlalekanFuad 是的,有一种方法:elastic.co/guide/en/elasticsearch/hadoop/current/… 请注意,您必须从 Scala API 转换为 PySpark,但这并不难。a【参考方案2】:我正在使用 pyspark 在 Amazon 的 EMR 集群中运行我的代码。然后,我使其工作的方式是按照以下步骤操作:
1) 将此引导操作放入集群创建中(创建 localhost elasticsearch 服务器):
s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb
2) 我运行这些命令来用一些数据填充弹性搜索数据库:
curl -XPUT "http://localhost:9200/movies/movie/1" -d'
"title": "The Godfather",
"director": "Francis Ford Coppola",
"year": 1972
'
如果您愿意,也可以运行其他 curl 命令,例如:
curl -XGET http://localhost:9200/_search?pretty=true&q='matchAll':''
3) 我使用以下参数启动了 pyspark:
pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar
我之前已经下载了 elasticsearch python 客户端
4) 我运行以下代码:
from pyspark import SparkConf
from pyspark.sql import SQLContext
q ="""
"query":
"match_all":
"""
es_read_conf =
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "movies/movie",
"es.query" : q
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
sqlContext.createDataFrame(es_rdd).collect()
然后我终于从命令中得到了成功的结果。
【讨论】:
可以直接加载DataFrames:elastic.co/guide/en/elasticsearch/hadoop/current/…【参考方案3】:在将地理过滤数据放入 PySpark DataFrame 时,我遇到了类似的问题。我将 elasticsearch-spark-20_2.11-5.2.2.jar 与 Spark 2.1.1 版和 ES 5.2 版一起使用。通过在创建 DataFrame 时将我的查询指定为选项,我能够将数据加载到 DataFrame 中
我的地理查询
q ="""
"query":
"bool" :
"must" :
"match_all" :
,
"filter" :
"geo_distance" :
"distance" : "100km",
"location" :
"lat" : 35.825,
"lon" : -87.99
"""
我使用以下命令将数据加载到DataFrame中
spark_df = spark.read.format("es").option("es.query", q).load("index_name")
这里有详细的 API:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-data-sources
【讨论】:
是的!见elastic.co/guide/en/elasticsearch/hadoop/current/…以上是关于如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引的主要内容,如果未能解决你的问题,请参考以下文章
是否可以使用 pyspark 过滤 Spark DataFrames 以返回列值在列表中的所有行?
PySpark DataFrames - 使用不同类型的列之间的比较进行过滤
使用 Python 的 reduce() 加入多个 PySpark DataFrames
Pyspark DataFrames 中的嵌套 SELECT 查询
使用 pyspark 在循环中附加 Spark DataFrames 的有效方法
在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe