使用 Pyspark / Dataframes 时,如何将谓词下推到 Cassandra 或限制请求的数据?

Posted

技术标签:

【中文标题】使用 Pyspark / Dataframes 时,如何将谓词下推到 Cassandra 或限制请求的数据?【英文标题】:How can you pushdown predicates to Cassandra or limit requested data when using Pyspark / Dataframes? 【发布时间】:2016-12-06 04:06:08 【问题描述】:

例如在docs.datastax.com 我们提到:

table1 = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="kv", keyspace="ks").load()

这是我知道的唯一方法,但可以说我只想加载此表中的最后一百万个条目。我不想每次都将整个表加载到内存中,尤其是如果这个表有超过 1000 万个条目。

谢谢!

【问题讨论】:

你在问什么?我对你的目标感到困惑。 我的目标是将表中的数据更快地加载到 DataFrame 中。我问是否可以从数据源的表中加载部分数据或任何其他方式,而不是读取整个表中所述问题。 【参考方案1】:

虽然您无法更快地加载数据。您可以加载部分数据或提前终止。 Spark DataFrames 利用催化剂优化其底层查询计划,使其能够采取一些捷径。

例如调用limit 将允许Spark 跳过从底层数据源读取某些部分。这些将通过取消正在执行的任务来限制从 Cassandra 读取的数据量。

底层数据源可以使用调用过滤器或添加过滤器来帮助限制从 Cassandra 实际提取的信息量。可以下推的内容是有限制的,但这在文档中有详细说明。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushing-down-clauses-to-cassandra

请注意,所有这一切都是通过在调用 DataSource 后对 DataSource 进行进一步的 api 调用来完成的。例如

val df = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(table="kv", keyspace="ks")
  .load()

df.show(10) // Will compute only enough tasks to get 10 records and no more
df.filter(clusteringKey > 5).show() //Will pass down the clustering predicate to C*

【讨论】:

所以第一条指令可能是从 Cassandra 加载数据的标准方式:“val df = sqlContext....load()”。所以即使在调用 load() 之后,响应时间仍然是即时的。然后我调用 df.count() ,这就是开始的惰性计算。我意识到另一件事,如果我应用一个过滤器来检索最近几天的条目(占总数的 30%),执行时间甚至比没有过滤器(where 条件)的时间更长。我的问题是我在一个表中有太多数据,我应该做的是尝试将数据拆分为几天或几周而不是几个月。 非常有用的解释...比文档更好

以上是关于使用 Pyspark / Dataframes 时,如何将谓词下推到 Cassandra 或限制请求的数据?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:具有不同列的 DataFrames 的动态联合

如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引

Pyspark DataFrames 中的嵌套 SELECT 查询

使用 pyspark 在循环中附加 Spark DataFrames 的有效方法

PySpark DataFrames - 使用不同类型的列之间的比较进行过滤

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe