RDD 不可序列化 Cassandra/Spark 连接器 java API

Posted

技术标签:

【中文标题】RDD 不可序列化 Cassandra/Spark 连接器 java API【英文标题】:RDD not serializable Cassandra/Spark connector java API 【发布时间】:2014-11-16 21:39:09 【问题描述】:

所以我之前有一些关于如何在 java maven 项目中使用 spark 查询 cassandra 的问题:Querying Data in Cassandra via Spark in a Java Maven Project

嗯,我的问题得到了回答,并且有效,但是我遇到了一个问题(可能是一个问题)。我现在正在尝试使用 datastax java API。这是我的代码:

package com.angel.testspark.test2;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App 


    // firstly, we define a bean class
    public static class Person implements Serializable 
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person()  

        public Integer getId()  return id; 
        public void setId(Integer id)  this.id = id; 

        public String getfname()  return fname; 
        public void setfname(String fname)  this.fname = fname; 

        public String getlname()  return lname; 
        public void setlname(String lname)  this.lname = lname; 

        public String getrole()  return role; 
        public void setrole(String role)  this.role = role; 

        // other methods, constructors, etc.
    

    private transient SparkConf conf;
    private App(SparkConf conf) 
        this.conf = conf;
    


    private void run() 
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    

    private void createSchema(JavaSparkContext sc) 

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() 
                    @Override
                    public String call(Person person) throws Exception 
                        return person.toString();
                    
                );
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
               



    public static void main( String[] args )
    
        if (args.length != 2) 
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    

这是我的错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

现在我知道我的错误在哪里了。它是System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));,因为我需要将 rdd 转换为数组。但是,API 文档说我应该能够做到这一点……这是从文档中复制和粘贴的代码。为什么我不能将 RDD 序列化为数组?

我已经使用上面链接中包含的帖子中的插入将虚拟数据插入到我的 cassandra 中。

另外,我之前解决的一个错误是我将所有的 getter 和 setter 都更改为小写。当我在其中使用大写字母时,会产生错误。为什么我不能在我的 getter 和 setter 中使用大写字母?

谢谢, 天使

【问题讨论】:

【参考方案1】:

public class App 更改为public class App implements Serializable 应该可以修复错误。因为 java 内部类将保留对外部类的引用,所以您的 Function 对象将具有对 App 的引用。由于 Spark 需要序列化您的 Function 对象,它要求 App 也是可序列化的。

【讨论】:

谢谢!那行得通。您对为什么此声明不起作用有任何见解吗? JavaRDD&lt;String&gt; rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class).where("role=?", "IT Engineer").map(new Function&lt;Person, String&gt;() 如果我离开 .where(),它会给我一个错误,但如果我删除它并离开 .map,整个代码都可以工作。据记录, .where 应该可以工作 我在上面发布了另一个问题,但已经回答了***.com/questions/26001566/…。谢谢!

以上是关于RDD 不可序列化 Cassandra/Spark 连接器 java API的主要内容,如果未能解决你的问题,请参考以下文章

Scala:RDD映射中的任务不可序列化由json4s“隐式val格式= DefaultFormats”引起

使用 Calliope 库的 Spark Cassandra 集成 - 不显示任何记录

Spark Cassandra 连接器找不到 java.time.LocalDate

Cassandra Spark 写入缓慢

org.apache.spark.SparkException:任务不可序列化,wh

Cassandra&Spark:我可以将项目添加到行以从行列表创建数据框