RDD 不可序列化 Cassandra/Spark 连接器 java API
Posted
技术标签:
【中文标题】RDD 不可序列化 Cassandra/Spark 连接器 java API【英文标题】:RDD not serializable Cassandra/Spark connector java API 【发布时间】:2014-11-16 21:39:09 【问题描述】:所以我之前有一些关于如何在 java maven 项目中使用 spark 查询 cassandra 的问题:Querying Data in Cassandra via Spark in a Java Maven Project
嗯,我的问题得到了回答,并且有效,但是我遇到了一个问题(可能是一个问题)。我现在正在尝试使用 datastax java API。这是我的代码:
package com.angel.testspark.test2;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.Serializable;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class App
// firstly, we define a bean class
public static class Person implements Serializable
private Integer id;
private String fname;
private String lname;
private String role;
// Remember to declare no-args constructor
public Person()
public Integer getId() return id;
public void setId(Integer id) this.id = id;
public String getfname() return fname;
public void setfname(String fname) this.fname = fname;
public String getlname() return lname;
public void setlname(String lname) this.lname = lname;
public String getrole() return role;
public void setrole(String role) this.role = role;
// other methods, constructors, etc.
private transient SparkConf conf;
private App(SparkConf conf)
this.conf = conf;
private void run()
JavaSparkContext sc = new JavaSparkContext(conf);
createSchema(sc);
sc.stop();
private void createSchema(JavaSparkContext sc)
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("role=?", "IT Engineer").map(new Function<Person, String>()
@Override
public String call(Person person) throws Exception
return person.toString();
);
System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
public static void main( String[] args )
if (args.length != 2)
System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
System.exit(1);
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
App app = new App(conf);
app.run();
这是我的错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
现在我知道我的错误在哪里了。它是System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
,因为我需要将 rdd 转换为数组。但是,API 文档说我应该能够做到这一点……这是从文档中复制和粘贴的代码。为什么我不能将 RDD 序列化为数组?
我已经使用上面链接中包含的帖子中的插入将虚拟数据插入到我的 cassandra 中。
另外,我之前解决的一个错误是我将所有的 getter 和 setter 都更改为小写。当我在其中使用大写字母时,会产生错误。为什么我不能在我的 getter 和 setter 中使用大写字母?
谢谢, 天使
【问题讨论】:
【参考方案1】:将public class App
更改为public class App implements Serializable
应该可以修复错误。因为 java 内部类将保留对外部类的引用,所以您的 Function
对象将具有对 App
的引用。由于 Spark 需要序列化您的 Function
对象,它要求 App
也是可序列化的。
【讨论】:
谢谢!那行得通。您对为什么此声明不起作用有任何见解吗?JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class).where("role=?", "IT Engineer").map(new Function<Person, String>()
如果我离开 .where(),它会给我一个错误,但如果我删除它并离开 .map,整个代码都可以工作。据记录, .where 应该可以工作
我在上面发布了另一个问题,但已经回答了***.com/questions/26001566/…。谢谢!以上是关于RDD 不可序列化 Cassandra/Spark 连接器 java API的主要内容,如果未能解决你的问题,请参考以下文章
Scala:RDD映射中的任务不可序列化由json4s“隐式val格式= DefaultFormats”引起
使用 Calliope 库的 Spark Cassandra 集成 - 不显示任何记录
Spark Cassandra 连接器找不到 java.time.LocalDate