Spark和Elasticsearch交互
Posted 数据科学和工程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark和Elasticsearch交互相关的知识,希望对你有一定的参考价值。
目录
背景
第一部分 环境依赖
第二部分 交互接口
第三部分 任务提交
参考文献及资料
背景
为了更好的支持Spark应用和Elasticsearch
交互,Elasticsearch
官方推出了elasticsearch-hadoop
项目。本文将详细介绍Spark Java应用和Elasticsearch
的交互细节。
第一部分 环境依赖
1.1 配置Maven依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-13_2.11</artifactId>
<version>6.8.2</version>
</dependency>
需要注意Spark版本和elasticsearch-hadoop
版本的兼容性,参考版本对照表:
Spark Version |
Scala Version |
ES-Hadoop Artifact ID |
---|---|---|
1.0 - 1.2 | 2.10 | <unsupported> |
1.0 - 1.2 | 2.11 | <unsupported> |
1.3 - 1.6 | 2.10 | elasticsearch-spark-13_2.10 |
1.3 - 1.6 | 2.11 | elasticsearch-spark-13_2.11 |
2.0+ | 2.10 | elasticsearch-spark-20_2.10 |
2.0+ | 2.11 | elasticsearch-spark-20_2.11 |
1.2 Spark配置
关于elasticsearch
集群的交互配置,定义在SparkConf
中,例如下面的案例:
import org.apache.spark.SparkConf;
SparkConf sparkConf = new SparkConf().setAppName("JavaSpark").setMaster("local");
//config elasticsearch
sparkConf.set("es.nodes","192.168.31.3:9200");
sparkConf.set("es.port","9200");
sparkConf.set("es.index.auto.create","true");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
es.nodes
,集群节点;es.port
,服务端口;es.index.auto.create
,参数指定index
是否自动创建;
其他配置参考官方文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html
第二部分 交互接口
2.1 自定义id的写入
在业务数据写入elasticsearch
集群的时候,需要数据去重。这时候就需要自己指定元数据字段中的_id
。elasticsearch
在处理_id
相同的数据时,会覆盖写入。例如下面的例子:
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.Metadata;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashMap;
import static org.elasticsearch.spark.rdd.Metadata.ID;
try{
ArrayList<Tuple2<HashMap,HashMap>> metaList = new ArrayList<>();
for(int i=0;i<100;i++) {
HashMap<String, String> map = new HashMap<String, String>();
map.put("id", String.valueOf(i));
map.put("name", "one");
HashMap<Metadata, String> metamap = new HashMap<Metadata, String>();
metamap.put(ID, String.valueOf(i));
metaList.add(new Tuple2(metamap, map));
}
JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(metaList);
JavaEsSpark.saveToEsWithMeta(pairRdd,"spark/doc");
}catch (Exception e){
e.printStackTrace();
System.out.println("finish!");
jsc.stop();
}
例子中我们使用ArrayList<Tuple2<HashMap,HashMap>>
数据结构来存储待写入的数据,然后构造RDD
,最后使用JavaEsSpark.saveToEsWithMeta
方法写入。需要注意这里构造的两个HashMap
:
数据
HashMap
,数据结构为:HashMap<String, String>
,用于存储数据键值对。元数据
HashMap
,数据结构为:HashMap<Metadata, String>
,用于存储元数据键值对。例如ID
即为_id
。
其他类型读写可以参考官方网站:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html
第三部分 任务提交
最后编译运行。主要是setMaster()
指定运行方式,分为如下几种。
运行模式 | 说明 |
---|---|
local |
Run Spark locally with one worker thread (i.e. no parallelism at all). |
local[K] |
Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). |
local[*] |
Run Spark locally with as many worker threads as logical cores on your machine. |
spark://HOST:PORT |
Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. |
mesos://HOST:PORT |
Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://... . To submit with --deploy-mode cluster , the HOST:PORT should be configured to connect to the MesosClusterDispatcher. |
yarn |
Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode . The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable. |
yarn-client |
Equivalent to yarn with --deploy-mode client , which is preferred to yarn-client |
yarn-cluster |
Equivalent to yarn with --deploy-mode cluster , which is preferred to yarn-cluster |
除了在eclipse、Intellij中运行local模式的任务,也可以打成jar包,使用spark-submit来进行任务提交。
参考文献及资料
1、 Apache Spark support,链接:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html
2、elasticsearch-hadoop项目,链接:https://github.com/elastic/elasticsearch-hadoop
以上是关于Spark和Elasticsearch交互的主要内容,如果未能解决你的问题,请参考以下文章
在这个 spark 代码片段中 ordering.by 是啥意思?
elasticsearch代码片段,及工具类SearchEsUtil.java
ElasticSearch学习问题记录——Invalid shift value in prefixCoded bytes (is encoded value really an INT?)(代码片段
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段