SparkSQL read Elasticsearch ClassNotFoundException
Posted fansy1990
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL read Elasticsearch ClassNotFoundException相关的知识,希望对你有一定的参考价值。
环境
软件 | 版本 |
---|---|
Elasticsearch | 2.3.3 |
spark | cdh-1.6.0 |
idea | 2016 |
问题描述:
现在有一个Elasticsearch集群,需要使用SparkSQL直接连接进行读取,参考Elasticsearch官网代码,直接编写下面的程序进行读取(在Windows上编写程序,提交到远程Spark Standalone集群):
public class TestDirectReadForQuery
public static void main(String[] args)
String esTable = "twitter/doc"; //
String query = "select * from "+esTable+" limit 10";
HashMap<String, String> options = new HashMap<String, String>();
options.put("es.nodes", "192.168.0.78"); // ES环境IP之一
options.put("es.port", "9200"); // ES环境端口
options.put("es.read.metadata", "false");
options.put("es.mapping.date.rich", "false"); //必须,否则日期字段转换报错
SparkConf conf = new SparkConf();
conf.set("spark.master", "spark://server2.tipdm.com:7077");
conf.set("spark.app.name", "read es table ");
conf.set("spark.executor.memory","1g"); // 建议不小于z"1g"
conf.set("spark.executor.cores","2");
conf.set("spark.cores.max","2");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
String q = "?q=*:*"; // 支持queryString语法的过滤(若不使用过滤器则无需修改)
// ES中的索引名(即需要映射的表名),可通过访问http://10.16.15.8:9200/_plugin/head,了解珠江数码环境表命名方式、数据结构等
String sparkSqlTable = "tmp"+System.currentTimeMillis(); // 映射为sparkSql中的表名
DataFrame esDF = JavaEsSparkSQL.esDF(sqlContext, esTable, q, options);
esDF.registerTempTable(sparkSqlTable);
Row[] rows = sqlContext.sql(query.replace(esTable,sparkSqlTable)).collect();
System.out.println(Arrays.toString(rows));
问题解决
执行完成后,报下面的错误:
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.rdd.EsPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
从错误来看,应该是Classpath的原因,spark在执行查询的时候,需要拿到Elasticsearch-spark的class,所以需要把其jar包添加到Classpath中,比较方便的做法是:
SparkConf conf = new SparkConf();
conf.set("spark.master", "spark://server2.tipdm.com:7077");
conf.set("spark.app.name", "read es table ");
conf.set("spark.executor.memory","1g"); // 建议不小于z"1g"
conf.set("spark.executor.cores","2");
conf.set("spark.cores.max","2");
conf.set("spark.jars","/path/to/elasticsearch-spark_2.10-2.3.2.jar");
添加elasticsearch-spark_2.10-2.3.2.jar jar包所在路径即可。
以上是关于SparkSQL read Elasticsearch ClassNotFoundException的主要内容,如果未能解决你的问题,请参考以下文章