SparkSQL read Elasticsearch ClassNotFoundException

Posted fansy1990

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL read Elasticsearch ClassNotFoundException相关的知识,希望对你有一定的参考价值。

环境

软件版本
Elasticsearch2.3.3
sparkcdh-1.6.0
idea2016

问题描述:

现在有一个Elasticsearch集群,需要使用SparkSQL直接连接进行读取,参考Elasticsearch官网代码,直接编写下面的程序进行读取(在Windows上编写程序,提交到远程Spark Standalone集群):

public class TestDirectReadForQuery 
    public static void main(String[] args)
        String esTable = "twitter/doc"; //
        String query = "select * from "+esTable+" limit 10";

        HashMap<String, String> options = new HashMap<String, String>();
        options.put("es.nodes", "192.168.0.78"); // ES环境IP之一
        options.put("es.port", "9200"); // ES环境端口
        options.put("es.read.metadata", "false");
        options.put("es.mapping.date.rich", "false"); //必须,否则日期字段转换报错

        SparkConf conf = new SparkConf();
        conf.set("spark.master", "spark://server2.tipdm.com:7077");
        conf.set("spark.app.name", "read es table ");
        conf.set("spark.executor.memory","1g"); // 建议不小于z"1g"
        conf.set("spark.executor.cores","2");
        conf.set("spark.cores.max","2");

        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        String q = "?q=*:*"; // 支持queryString语法的过滤(若不使用过滤器则无需修改)
      //   ES中的索引名(即需要映射的表名),可通过访问http://10.16.15.8:9200/_plugin/head,了解珠江数码环境表命名方式、数据结构等
        String sparkSqlTable = "tmp"+System.currentTimeMillis(); // 映射为sparkSql中的表名
        DataFrame esDF = JavaEsSparkSQL.esDF(sqlContext, esTable, q, options);
        esDF.registerTempTable(sparkSqlTable);

        Row[] rows = sqlContext.sql(query.replace(esTable,sparkSqlTable)).collect();
        System.out.println(Arrays.toString(rows));

    

问题解决

执行完成后,报下面的错误:

Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.rdd.EsPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)

从错误来看,应该是Classpath的原因,spark在执行查询的时候,需要拿到Elasticsearch-spark的class,所以需要把其jar包添加到Classpath中,比较方便的做法是:

        SparkConf conf = new SparkConf();
        conf.set("spark.master", "spark://server2.tipdm.com:7077");
        conf.set("spark.app.name", "read es table ");
        conf.set("spark.executor.memory","1g"); // 建议不小于z"1g"
        conf.set("spark.executor.cores","2");
        conf.set("spark.cores.max","2");
        conf.set("spark.jars","/path/to/elasticsearch-spark_2.10-2.3.2.jar");

添加elasticsearch-spark_2.10-2.3.2.jar jar包所在路径即可。

以上是关于SparkSQL read Elasticsearch ClassNotFoundException的主要内容,如果未能解决你的问题,请参考以下文章

SparkSQL

sparksql

spark 怎么去连接 ElasticSearch

入门大数据---SparkSQL联结操作

SparkSQL - 直接读取镶木地板文件

入门大数据---SparkSQL常用聚合函数