无法在 Spark 中合并两个 CassandraJavaRDD<CassandraRow>

Posted

技术标签:

【中文标题】无法在 Spark 中合并两个 CassandraJavaRDD<CassandraRow>【英文标题】:Can not union two CassandraJavaRDD<CassandraRow> in Spark 【发布时间】:2016-03-07 13:12:24 【问题描述】:

由于从 Cassandra 查询数据有限制,我正在尝试使用 Spark 批量读取数据并将其存储在 RDD 中。

然后我添加所有的 RDD ,使用联合函数。

这是我的代码。

private void getDataFromCassandra(JavaSparkContext sc) 


    CassandraJavaRDD<CassandraRow> cassandraRDD = null ;
    CassandraJavaRDD<CassandraRow> cassandraRDD2  = null;

    While(Some Condition)

     cassandraRDD = CassandraJavaUtil
                .javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz")
                .where("pid IN ('" + sb + "')");

    if(cassandraRDD2==null)


     cassandraRDD2=cassandraRDD;
    
    else
        cassandraRDD2 =  cassandraRDD2.union(cassandraRDD);
    
             

但在联合中我收到以下错误。

类型不匹配:无法从 JavaRDD 转换为 CassandraJavaRDD

虽然两个 RDD 的类型相似。

所以 1) 我应该将 Cast 应用为

 cassandraRDD2 =  (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD);

2) 或者将其中一个 RDD 的 Type 更改为 JavaRDD

【问题讨论】:

你在哪里设置cassandraRDD2?它似乎总是一个空值。 在 if 条件下,我将 cassandraRDD2 分配给 cassandraRDD。 如何执行null.isEmpty()?因为这就是你在那里做的事情 是的,我忘记将其更改为 if(cassandraRDD2==null) ,但是为什么我需要转换它? 我省略了部分代码,因为 if 和 else 条件在循环中运行,一旦 if 语句为真,在 cassandraRDD2 将其分配给 cassandraRDD 之后,它将进入 else 部分。然后需要转换,我希望我很清楚。我刚刚做了挂件。对此感到抱歉。 【参考方案1】:

问题发生是因为根据docs:

方法: union(JavaRDD other) 返回这个RDD和另一个RDD的联合。

返回值:JavaRDD

因此不匹配。

因为根据this:

public class CassandraJavaRDD<R> extends JavaRDD<R> 
...

CassandraJavaRDD 类扩展了 JavaRDD,因此您可以使用:

JavaRDD<CassandraRow> cassandraRDD = null;
JavaRDD<CassandraRow> cassandraRDD2 = null;

因此union() 方法的返回值将匹配其类型。

【讨论】:

感谢您的回答。 JavaRDD cassandraRDD2 = sc.emptyRDD(); JavaRDD cassandraRDD = sc.emptyRDD();我可以将这两个空 RDD 合并为 cassandraRDD2 = cassandraRDD2.union(cassandraRDD); ? 你应该可以做到的。

以上是关于无法在 Spark 中合并两个 CassandraJavaRDD<CassandraRow>的主要内容,如果未能解决你的问题,请参考以下文章

无法导入 org.apache.spark.sql.cassandra.CassandraSQLContext

无法使用 Spark cassandra 连接器 1.5.0 连接 Cassandra 3.0

使用 Spark SQL 在 cassandra 中加入两个表 - 错误:缺少 EOF

Spark SQL下推Cassandra UDF?

Spark cassandra 连接器 + 加入超时

Spark-Cassandra 写入所需的时间比预期的要长