无法在 Spark 中合并两个 CassandraJavaRDD<CassandraRow>
Posted
技术标签:
【中文标题】无法在 Spark 中合并两个 CassandraJavaRDD<CassandraRow>【英文标题】:Can not union two CassandraJavaRDD<CassandraRow> in Spark 【发布时间】:2016-03-07 13:12:24 【问题描述】:由于从 Cassandra 查询数据有限制,我正在尝试使用 Spark 批量读取数据并将其存储在 RDD 中。
然后我添加所有的 RDD ,使用联合函数。
这是我的代码。
private void getDataFromCassandra(JavaSparkContext sc)
CassandraJavaRDD<CassandraRow> cassandraRDD = null ;
CassandraJavaRDD<CassandraRow> cassandraRDD2 = null;
While(Some Condition)
cassandraRDD = CassandraJavaUtil
.javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz")
.where("pid IN ('" + sb + "')");
if(cassandraRDD2==null)
cassandraRDD2=cassandraRDD;
else
cassandraRDD2 = cassandraRDD2.union(cassandraRDD);
但在联合中我收到以下错误。
类型不匹配:无法从 JavaRDD 转换为 CassandraJavaRDD
虽然两个 RDD 的类型相似。
所以 1) 我应该将 Cast 应用为
cassandraRDD2 = (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD);
2) 或者将其中一个 RDD 的 Type 更改为 JavaRDD
【问题讨论】:
你在哪里设置cassandraRDD2
?它似乎总是一个空值。
在 if 条件下,我将 cassandraRDD2 分配给 cassandraRDD。
如何执行null.isEmpty()
?因为这就是你在那里做的事情
是的,我忘记将其更改为 if(cassandraRDD2==null) ,但是为什么我需要转换它?
我省略了部分代码,因为 if 和 else 条件在循环中运行,一旦 if 语句为真,在 cassandraRDD2 将其分配给 cassandraRDD 之后,它将进入 else 部分。然后需要转换,我希望我很清楚。我刚刚做了挂件。对此感到抱歉。
【参考方案1】:
问题发生是因为根据docs:
方法: union(JavaRDD other) 返回这个RDD和另一个RDD的联合。
返回值:JavaRDD
因此不匹配。
因为根据this:
public class CassandraJavaRDD<R> extends JavaRDD<R>
...
CassandraJavaRDD
类扩展了 JavaRDD
,因此您可以使用:
JavaRDD<CassandraRow> cassandraRDD = null;
JavaRDD<CassandraRow> cassandraRDD2 = null;
因此union()
方法的返回值将匹配其类型。
【讨论】:
感谢您的回答。 JavaRDD以上是关于无法在 Spark 中合并两个 CassandraJavaRDD<CassandraRow>的主要内容,如果未能解决你的问题,请参考以下文章
无法导入 org.apache.spark.sql.cassandra.CassandraSQLContext
无法使用 Spark cassandra 连接器 1.5.0 连接 Cassandra 3.0