关于livy的 java api 报错org.apache.livy.shaded.kryo.kryo.KryoException: Unable to find class: com.xxx.wor

Posted 学习笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于livy的 java api 报错org.apache.livy.shaded.kryo.kryo.KryoException: Unable to find class: com.xxx.wor相关的知识,希望对你有一定的参考价值。

Livy Java api

依赖

<dependency>
  <groupId>org.apache.livy</groupId>
  <artifactId>livy-client-http</artifactId>
  <version>0.5.0-incubating</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.2</version>
</dependency>

业务程序

public class WordCountJavaSpark implements Job<Object> {
    /**
     * call就是执行逻辑
     * @param jobContext
     * @return
     * @throws Exception
     */
    @Override
    public Object call(JobContext jobContext) throws Exception {
        JavaSparkContext sc = jobContext.sc();
        Map<String ,Integer> mp = new HashMap<String, Integer>();


        // 此处要使用hdfs的ha路径,则需要在livy的livy-env.sh中配置HADOOP_CONF_DIR
        JavaRDD<String> javaRDD = sc.textFile("hdfs://myha/data/livy/zookeeper.out");
        JavaRDD<String> flatMapedRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        JavaPairRDD<String,Integer> mapedRDD = flatMapedRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String,Integer>(s,1);
            }
        });

        JavaPairRDD<String, Integer> reduceJavaPariRDD = mapedRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer1, Integer integer2) throws Exception {
                return integer1 + integer2;
            }
        });


        reduceJavaPariRDD.collect().forEach((tuple2)->{
            System.out.println(tuple2._1+"<===>"+tuple2._2);
            mp.put(tuple2._1,tuple2._2);
        });

        return mp;
    }
}

启动程序

public class StartApp {
    private static LivyClient client = null;

    public static void main(String[] args) {

        String livyURI ="http://192.168.128.100:8998";

        //jar包的位置
        String file = "/Users/chenzhuanglan/WorkSpace/livyjavatest/out/artifacts/livyWC/WordCountJavaSpark.jar";
        
        try{
            client = new LivyClientBuilder().setURI(new URI(livyURI)).build();

            System.err.printf("Uploading %s to the Spark context...
", file);
            // 将 spark job的 jar包上传到服务器上
            client.uploadJar(new File(file)).get();

            System.err.printf("Running WordCountJavaSpark ...
");
            // 提交作业
            HashMap<String,Integer> map = (HashMap<String, Integer>) client.submit(new WordCountJavaSpark()).get();

            map.forEach((k,v)->{
                System.out.println(k+"==="+v);
            });


        } catch (IOException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

}

注意、注意、注意!

log4j:WARN No appenders could be found for logger (org.apache.livy.shaded.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Uploading /Users/chenzhuanglan/WorkSpace/testwcjava/target/testwcjava-1.0-SNAPSHOT.jar to the Spark context...
Running WordCountJavaSpark ...
java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.livy.shaded.kryo.kryo.KryoException: Unable to find class: com.czlan.wordcount.WordCountJavaSpark
org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
org.apache.livy.shaded.kryo.kryo.Kryo.readClass(Kryo.java:656)
org.apache.livy.shaded.kryo.kryo.Kryo.readClassAndObject(Kryo.java:767)
org.apache.livy.client.common.Serializer.deserialize(Serializer.java:63)
org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:39)
org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27)
org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:57)
org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:42)
org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:27)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
    at org.apache.livy.client.http.JobHandleImpl.get(JobHandleImpl.java:198)
    at org.apache.livy.client.http.JobHandleImpl.get(JobHandleImpl.java:88)
    at com.czlan.wordcount.StartApp2.main(StartApp2.java:32)

上面这个报错是因为
client.uploadJar(new File(file)).get();写成了client.uploadFile(new File(file)).get();
uploadJar 是上传要添加到Spark应用程序类路径中的jar
uploadFile 是上传要传递给Spark应用程序的文件




以上是关于关于livy的 java api 报错org.apache.livy.shaded.kryo.kryo.KryoException: Unable to find class: com.xxx.wor的主要内容,如果未能解决你的问题,请参考以下文章

Livy REST API:GET 请求有效,但 POST 请求失败并显示“需要 401 身份验证”

(Zeppelin + Livy)SparkUI.appUIAddress(),一定是错的

Livy 在 120 秒内未找到带有标签 livy-batch-10-hg3po7kp 的 YARN 应用程序

在 hue 中运行 livy spark 服务器时出错

Apache Spark和Livy集群

从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业