如何在 Spark 中过滤来自 Cassandra 的空数据?

Posted

技术标签:

【中文标题】如何在 Spark 中过滤来自 Cassandra 的空数据?【英文标题】:How to filter the null data from Cassandra in Spark? 【发布时间】:2017-07-20 07:32:07 【问题描述】:

我想过滤从 Cassandra 中选择的 Null 值。

这是我的查询:

scala> var rdd =  sc.cassandraTable("keyspace", "table").select("month", "timetag", "name").where("month = ?", "201704")
scala> var data = rdd.filter(_.getString("name") != null)

如果我使用 getString("xxx") 比较 null,它会显示

"java.lang.NullPointerException:第 2 列的意外空值。 使用 get[Option[...]] 接收空值。”

之后,我尝试使用 getStringOption 进行比较。

scala> var rdd =  sc.cassandraTable("keyspace", "table").select("month", "timetag", "name").where("month = ?", "201704")
scala> var data = rdd.filter(_.getStringOption("name") != null)

这一次,它没有显示任何错误消息。但是数据没有被过滤。空数据仍然存在。

有谁知道 getStringgetStringOption 有什么不同吗? 或者有没有其他方法可以比较 Cassandra 中的数据 是否为 null

非常感谢!

【问题讨论】:

【参考方案1】:

尝试做:

_.getStringOption("name").isDefined

代替:

 _.getStringOption("name") != null

更新

对选项部分进行一些扩展可能是相关的。

getStringOption(内部实现为 get[Option[String]] 返回一个 scala Option 类。

Option 是函数式编程中的一种惯用方式,用于表示具有某些值(表示为 Some(value))或没有值(表示为 None)的东西。

与java中使用null不同,None实际上是一个具有各种功能的对象(这里的例子是isDefined)。这使得在对象上组合多个操作变得容易,同时确保如果在路上的某些东西没有定义,那么我们不会得到空指针异常。

有关选项类型的更多信息,请参阅the scala class documentation 或 this stack overflow answer 或 this stack overflow answer

【讨论】:

现在可以使用了!!但是我怎么知道我可以为 getStringOption 使用什么方法呢?有这样的文件吗?真的谢谢!! @EmmaLin getStringOption 返回一个 scala 选项。见scala-lang.org/api/current/scala/Option.html @AssafMendelson 添加请简单解释它是一个选项以及我们应该如何检查选项:) @T.Gawęda 完成。【参考方案2】:

当您希望收到 Cassandra 空值时使用 getOption 变体

_.get[Option[String]]("name")
res8: Option[String] = Some(abc)

【讨论】:

这是 OP 使用的 getStringOption 的实现。

以上是关于如何在 Spark 中过滤来自 Cassandra 的空数据?的主要内容,如果未能解决你的问题,请参考以下文章

spark如何在cassandra表之间复制数据?

在 Spark 中对巨大数据帧进行高效过滤

Spark + cassandra:如何创建键空间?

使用 Spark/Cassandra 的时间序列 - 如何在值满足条件时找到时间戳?

通过 Spark SQL 查询 Cassandra UDT

如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?