Spark 与 Elasticsearch交互的一些配置和问题解决
Posted 陈小龙
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 与 Elasticsearch交互的一些配置和问题解决相关的知识,希望对你有一定的参考价值。
最近刚开始接触大数据,一个日志分析系统,需要用Spark开发,Elasticsearch作为数据库来使用。所以第一步要解决的就是怎么从Spark去取Elasticsearch上的数据,下面是软件的版本信息。(基本原则是开发和集群的版本都要一致)
开发环境
jdk: 1.8.0_91
scala: 2.11.8
spark: 2.1.0
IntelliJ IDEA 2017.1.1(集成开发环境)
集群环境
jdk: 1.8.0_91
scala: 2.11.8
hadoop-2.7.3
spark-2.1.0-bin-hadoop2.7
elasticsearch-5.2.0
开发环境中有两种方法来管理依赖包:SBT和Maven
SBT方式
build.sbt文件
name := "SparkTest"version := "1.0"scalaVersion := "2.11.8"libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.3"libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.7.3"libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.3"libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0"libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.0"libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0"libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.1.0"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
还有一种写法是这样的
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
1
就是不指定具体的scala版本,根据scalaVersion去自动加载,*_2.11指的就是scala2.11的版本,后面的”2.1.0”才是spark的版本。
Maven方式
pom.xml文件
...<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
大家可以看到,两种方法只是格式不同,具体的依赖包和写法可以看Maven 仓库网站
大家会发现上面两种方式并没有引入ES-Hadoop依赖包,那是因为我引入的时候IDEA会报错,不知何原因,所以我选择了手动引入。(大家也可以根据上面的两种方式引入看看是否会报错)
ES-Hadoop依赖包官方下载
(现在最新的是5.3.0,可以根据集群的Elasticsearch版本下载对应的版本)
在工程的Project Structure(快捷键:选中工程F4)中添加elasticsearch-hadoop-5.2.0.jar包
好了,开发环境的依赖包准备完毕,下面就开始coding了
Elasticsearch for Apache Hadoop官方技术文档(Spark部分)
(这里面写的超级简单,只是几句代码,对于我这样刚入门的小白来说还是没什么作用。。。)
所以还是给出我自己的Spark从Elasticsearch取数据的代码
def main(args: Array[String]): Unit = {
// 屏蔽控制台Log
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 生成SparkSession对象
val spark = SparkSession.builder() .appName("SparkTest") .master("spark://utm136:7077") .getOrCreate()
// 连接ES节点的信息
val options = Map("es.index.auto.create" -> "true", "pushdown" -> "true", "es.nodes" -> "172.16.4.90","es.port" -> "9200")
// 根据"索引/类型"(spark/docs)加载数据
val sparkDF = spark.sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("spark/docs")
// 控制台打印出message字段的内容
sparkDF.select("message").collect().foreach(println(_))
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
问题解决
当集群的各个节点spark-2.1.0-bin-hadoop2.7/jars目录下没有放入elasticsearch-hadoop-5.2.0.jar包时,会报java.lang.ClassNotFoundException: org.elasticsearch.spark.rdd.EsPartition错误
于是将elasticsearch-hadoop-5.2.0.jar放入各个节点的spark-2.1.0-bin-hadoop2.7/jars目录,再编译运行代码,会报
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class错误
就是这个错误困扰了我好久好久,查了好多网站,说是什么Spark编译的版本和运行时的版本不一致啦,Spark2.0以后的版本要对应scala2.11以后的版本等等说法,这些我都尝试了,问题仍然没有解决。
最终解决方法:
将elasticsearch-hadoop-5.2.0/dist/elasticsearch-spark-20_2.11-5.2.0.jar包(上面截图中红框框起来的包,注意scala版本)放入到集群各节点的spark-2.1.0-bin-hadoop2.7/jars目录下,而不是elasticsearch-hadoop-5.2.0.jar包!
参考:https://github.com/elastic/elasticsearch-hadoop/issues/862
以上是关于Spark 与 Elasticsearch交互的一些配置和问题解决的主要内容,如果未能解决你的问题,请参考以下文章
Spark-Cassandra 与 Spark-Elasticsearch
如何将 Spark EMR 集群与 AWS elasticsearch 集群连接起来
Elasticsearch-hadoop & Elasticsearch-spark sql - 语句跟踪扫描&滚动