Spark和Elasticsearch交互

Posted 数据科学和工程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark和Elasticsearch交互相关的知识,希望对你有一定的参考价值。

目录

  • 背景

  • 第一部分  环境依赖

  • 第二部分 交互接口

  • 第三部分 任务提交

  • 参考文献及资料

背景

为了更好的支持Spark应用和Elasticsearch交互,Elasticsearch官方推出了elasticsearch-hadoop项目。本文将详细介绍Spark Java应用和Elasticsearch的交互细节。

第一部分 环境依赖

1.1 配置Maven依赖

 <dependency>
   <groupId>org.elasticsearch</groupId>
   <artifactId>elasticsearch-spark-13_2.11</artifactId>
   <version>6.8.2</version>
 </dependency>

需要注意Spark版本和elasticsearch-hadoop版本的兼容性,参考版本对照表:

Spark Version Scala Version ES-Hadoop Artifact ID
1.0 - 1.2 2.10 <unsupported>
1.0 - 1.2 2.11 <unsupported>
1.3 - 1.6 2.10 elasticsearch-spark-13_2.10
1.3 - 1.6 2.11 elasticsearch-spark-13_2.11
2.0+ 2.10 elasticsearch-spark-20_2.10
2.0+ 2.11 elasticsearch-spark-20_2.11

1.2 Spark配置

关于elasticsearch集群的交互配置,定义在SparkConf中,例如下面的案例:

 import org.apache.spark.SparkConf;
 
 SparkConf sparkConf = new SparkConf().setAppName("JavaSpark").setMaster("local");
 //config elasticsearch
 sparkConf.set("es.nodes","192.168.31.3:9200");
 sparkConf.set("es.port","9200");
 sparkConf.set("es.index.auto.create","true");
 
 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  • es.nodes,集群节点;

  • es.port,服务端口;

  • es.index.auto.create,参数指定index是否自动创建;

其他配置参考官方文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html

第二部分 交互接口

2.1 自定义id的写入

在业务数据写入elasticsearch集群的时候,需要数据去重。这时候就需要自己指定元数据字段中的_idelasticsearch在处理_id相同的数据时,会覆盖写入。例如下面的例子:

 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.elasticsearch.spark.rdd.Metadata;
 import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
 import scala.Tuple2;
 import java.util.ArrayList;
 import java.util.HashMap;
 import static org.elasticsearch.spark.rdd.Metadata.ID;
 
 
 try{
       ArrayList<Tuple2<HashMap,HashMap>> metaList = new ArrayList<>();
     
       for(int i=0;i<100;i++) {
          HashMap<String, String> map = new HashMap<String, String>();
          map.put("id", String.valueOf(i));
          map.put("name", "one");
 
          HashMap<Metadata, String> metamap = new HashMap<Metadata, String>();
          metamap.put(ID, String.valueOf(i));
 
          metaList.add(new Tuple2(metamap, map));
        }
 
       JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(metaList);
       JavaEsSpark.saveToEsWithMeta(pairRdd,"spark/doc");
 
      }catch (Exception e){
         e.printStackTrace();
         System.out.println("finish!");
         jsc.stop();
  }

例子中我们使用ArrayList<Tuple2<HashMap,HashMap>>数据结构来存储待写入的数据,然后构造RDD,最后使用JavaEsSpark.saveToEsWithMeta方法写入。需要注意这里构造的两个HashMap:

  • 数据HashMap,数据结构为:HashMap<String, String>,用于存储数据键值对。

  • 元数据HashMap,数据结构为:HashMap<Metadata, String>,用于存储元数据键值对。例如ID即为_id

其他类型读写可以参考官方网站:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

第三部分 任务提交

最后编译运行。主要是setMaster()指定运行方式,分为如下几种。

运行模式 说明
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*] Run Spark locally with as many worker threads as logical cores on your machine.
spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://.... To submit with --deploy-mode cluster, the HOST:PORT should be configured to connect to the MesosClusterDispatcher.
yarn Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.
yarn-client Equivalent to yarn with --deploy-mode client, which is preferred to yarn-client
yarn-cluster Equivalent to yarn with --deploy-mode cluster, which is preferred to yarn-cluster

除了在eclipse、Intellij中运行local模式的任务,也可以打成jar包,使用spark-submit来进行任务提交。


参考文献及资料

1、 Apache Spark support,链接:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

2、elasticsearch-hadoop项目,链接:https://github.com/elastic/elasticsearch-hadoop


以上是关于Spark和Elasticsearch交互的主要内容,如果未能解决你的问题,请参考以下文章

在这个 spark 代码片段中 ordering.by 是啥意思?

elasticsearch代码片段,及工具类SearchEsUtil.java

python+spark程序代码片段

ElasticSearch学习问题记录——Invalid shift value in prefixCoded bytes (is encoded value really an INT?)(代码片段

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

ElasticSearch实战(四十五)-Spark 大数据离线同步方案