Spark 与 Elasticsearch交互的一些配置和问题解决

Posted 陈小龙

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 与 Elasticsearch交互的一些配置和问题解决相关的知识,希望对你有一定的参考价值。

最近刚开始接触大数据,一个日志分析系统,需要用Spark开发,Elasticsearch作为数据库来使用。所以第一步要解决的就是怎么从Spark去取Elasticsearch上的数据,下面是软件的版本信息。(基本原则是开发和集群的版本都要一致)

  • 开发环境 

    • jdk: 1.8.0_91

    • scala: 2.11.8

    • spark: 2.1.0

    • IntelliJ IDEA 2017.1.1(集成开发环境)

  • 集群环境 

    • jdk: 1.8.0_91

    • scala: 2.11.8

    • hadoop-2.7.3

    • spark-2.1.0-bin-hadoop2.7

    • elasticsearch-5.2.0

开发环境中有两种方法来管理依赖包:SBT和Maven

SBT方式

build.sbt文件

name := "SparkTest"version := "1.0"scalaVersion := "2.11.8"libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.3"libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.7.3"libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.3"libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0"libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.0"libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0"libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.1.0"
 
   
   
 
  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

  • 14

  • 15

  • 16

  • 17

  • 18

  • 19

  • 20

  • 21

还有一种写法是这样的

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
 
   
   
 
  • 1

就是不指定具体的scala版本,根据scalaVersion去自动加载,*_2.11指的就是scala2.11的版本,后面的”2.1.0”才是spark的版本。

Maven方式

pom.xml文件

...<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.3</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
     </dependency>
    ...
 
   
   
 
  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

  • 14

  • 15

  • 16

  • 17

  • 18

  • 19

  • 20

  • 21

  • 22

  • 23

  • 24

  • 25

  • 26

  • 27

  • 28

  • 29

  • 30

  • 31

  • 32

  • 33

  • 34

  • 35

  • 36

  • 37

  • 38

  • 39

  • 40

  • 41

  • 42

  • 43

大家可以看到,两种方法只是格式不同,具体的依赖包和写法可以看Maven 仓库网站 


大家会发现上面两种方式并没有引入ES-Hadoop依赖包,那是因为我引入的时候IDEA会报错,不知何原因,所以我选择了手动引入。(大家也可以根据上面的两种方式引入看看是否会报错)

ES-Hadoop依赖包官方下载 
(现在最新的是5.3.0,可以根据集群的Elasticsearch版本下载对应的版本) 

在工程的Project Structure(快捷键:选中工程F4)中添加elasticsearch-hadoop-5.2.0.jar包 


好了,开发环境的依赖包准备完毕,下面就开始coding了

Elasticsearch for Apache Hadoop官方技术文档(Spark部分) 
(这里面写的超级简单,只是几句代码,对于我这样刚入门的小白来说还是没什么作用。。。)

所以还是给出我自己的Spark从Elasticsearch取数据的代码

def main(args: Array[String]): Unit = {

    // 屏蔽控制台Log
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    // 生成SparkSession对象
    val spark = SparkSession.builder()      .appName("SparkTest")      .master("spark://utm136:7077")      .getOrCreate()

    // 连接ES节点的信息
    val options = Map("es.index.auto.create" -> "true", "pushdown" -> "true", "es.nodes" -> "172.16.4.90","es.port" -> "9200")

    // 根据"索引/类型"(spark/docs)加载数据
    val sparkDF = spark.sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("spark/docs")

    // 控制台打印出message字段的内容
    sparkDF.select("message").collect().foreach(println(_))
  }
 
   
   
 
  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

  • 14

  • 15

  • 16

  • 17

  • 18

  • 19

  • 20

  • 21

问题解决

当集群的各个节点spark-2.1.0-bin-hadoop2.7/jars目录下没有放入elasticsearch-hadoop-5.2.0.jar包时,会报java.lang.ClassNotFoundException: org.elasticsearch.spark.rdd.EsPartition错误

于是将elasticsearch-hadoop-5.2.0.jar放入各个节点的spark-2.1.0-bin-hadoop2.7/jars目录,再编译运行代码,会报 
java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class错误

就是这个错误困扰了我好久好久,查了好多网站,说是什么Spark编译的版本和运行时的版本不一致啦,Spark2.0以后的版本要对应scala2.11以后的版本等等说法,这些我都尝试了,问题仍然没有解决。

最终解决方法: 
将elasticsearch-hadoop-5.2.0/dist/elasticsearch-spark-20_2.11-5.2.0.jar包(上面截图中红框框起来的包,注意scala版本)放入到集群各节点的spark-2.1.0-bin-hadoop2.7/jars目录下,而不是elasticsearch-hadoop-5.2.0.jar包!

参考:https://github.com/elastic/elasticsearch-hadoop/issues/862


以上是关于Spark 与 Elasticsearch交互的一些配置和问题解决的主要内容,如果未能解决你的问题,请参考以下文章

Spark-Cassandra 与 Spark-Elasticsearch

如何将 Spark EMR 集群与 AWS elasticsearch 集群连接起来

Elasticsearch-hadoop & Elasticsearch-spark sql - 语句跟踪扫描&滚动

PHP如何与搜索引擎Elasticsearch交互?

如何使用python将Spark数据写入ElasticSearch

Spark on Elasticsearch一致性问题的探索