Spark Cassandra 连接器 - 分区键上的范围查询

Posted

技术标签:

【中文标题】Spark Cassandra 连接器 - 分区键上的范围查询【英文标题】:Spark Cassandra connector - Range query on partition key 【发布时间】:2014-11-19 23:20:15 【问题描述】:

我正在评估 spark-cassandra-connector,我正在努力尝试对分区键进行范围查询。

根据连接器的文档,似乎可以使用相等或 IN 运算符对分区键进行服务器端过滤,但不幸的是,我的分区键是时间戳,所以我不能使用它。

所以我尝试使用带有以下查询的 Spark SQL('timestamp' 是分区键):

select * from datastore.data where timestamp >= '2013-01-01T00:00:00.000Z' and timestamp < '2013-12-31T00:00:00.000Z'

虽然作业产生了 200 个任务,但查询没有返回任何数据。

我还可以确保自从在 cqlsh 上运行查询(使用 'token' 函数进行适当的转换)返回数据后,有数据要返回。

我在独立模式下使用 spark 1.1.0。 Cassandra 是 2.1.2,连接器版本是 'b1.1' 分支。 Cassandra 驱动程序是 DataStax 'master' 分支。 Cassandra 集群覆盖在具有 3 个复制因子为 1 的服务器的 spark 集群上。

Here is the job's full log

有什么线索吗?

更新:当尝试基于分区键(使用 CassandraRDD.where 方法)进行服务器端过滤时,出现以下异常:

Exception in thread "main" java.lang.UnsupportedOperationException: Range predicates on partition key columns (here: timestamp) are not supported in where. Use filter instead.

但不幸的是我不知道“过滤器”是什么......

【问题讨论】:

【参考方案1】:

您有多种选择来获得您正在寻找的解决方案。

最强大的方法是使用 Stratio 与 Cassandra 集成的 Lucene 索引,它允许您按服务器端的任何索引字段进行搜索。您的写作时间会增加,但另一方面,您将能够查询任何时间范围。您可以在 Cassandra here 中找到有关 Lucene 索引的更多信息。这个 Cassandra 的扩展版本完全集成到 deep-spark project 中,因此您可以通过它充分利用 Cassandra 中 Lucene 索引的所有优势。我建议您在执行检索中小型结果集的受限查询时使用 Lucene 索引,如果您要检索大量数据集,则应使用下面的第三个选项。

根据您的应用程序的工作方式,另一种方法可能是截断您的时间戳字段,以便您可以使用 IN 运算符查找它。问题是,据我所知,您不能使用 spark-cassandra-connector,您应该使用未与 Spark 集成的直接 Cassandra 驱动程序,或者您可以查看 deep-spark 项目一个允许这样做的新功能即将发布。您的查询将如下所示:

select * from datastore.data where timestamp IN ('2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', ... , '2013-12-31')

,但是,正如我之前所说,我不知道它是否符合您的需求,因为您可能无法截断数据并按日期/时间对其进行分组。

您拥有的最后一个选项,但效率较低,是将完整的数据集带到您的 Spark 集群并在 RDD 上应用过滤器。

免责声明:我为 Stratio 工作 :-) 如果您需要任何帮助,请随时与我们联系。

希望对你有帮助!

【讨论】:

您好,我目前正在试验 IN 运算符(支持),但我有点担心我最多需要获取 1 年的数据,这将导致具有 365 个值的 IN 表达式。无论如何,我会检查Stratio。谢谢!【参考方案2】:

我认为 CassandraRDD 错误表明您尝试执行的查询在 Cassandra 中是不允许的,您必须将所有表加载到 CassandraRDD 中,然后对此 CassandraRDD 进行火花过滤操作。

所以你的代码(在 scala 中)应该是这样的:

val cassRDD= sc.cassandraTable("keyspace name", "table name").filter(row=> row.getDate("timestamp")>=DateFormat('2013-01-01T00:00:00.000Z')&&row.getDate("timestamp") < DateFormat('2013-12-31T00:00:00.000Z'))

如果您对进行此类查询感兴趣,您可能需要查看其他 Cassandra 连接器,例如由 Stratio 开发的连接器

【讨论】:

您好,加载所有数据是我不想做的事情。我会检查这个连接器,谢谢你的提示!

以上是关于Spark Cassandra 连接器 - 分区键上的范围查询的主要内容,如果未能解决你的问题,请参考以下文章

Spark Cassandra 连接器 - perPartitionLimit

Spark cassandra 连接器 + 加入超时

在 Spark 中对巨大数据帧进行高效过滤

Spark + cassandra:如何创建键空间?

Cassandra 表有多少个分区键?

Cassandra 数据建模分区键