使用 Pyspark / Dataframes 时,如何将谓词下推到 Cassandra 或限制请求的数据?
Posted
技术标签:
【中文标题】使用 Pyspark / Dataframes 时,如何将谓词下推到 Cassandra 或限制请求的数据?【英文标题】:How can you pushdown predicates to Cassandra or limit requested data when using Pyspark / Dataframes? 【发布时间】:2016-12-06 04:06:08 【问题描述】:例如在docs.datastax.com 我们提到:
table1 = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="kv", keyspace="ks").load()
这是我知道的唯一方法,但可以说我只想加载此表中的最后一百万个条目。我不想每次都将整个表加载到内存中,尤其是如果这个表有超过 1000 万个条目。
谢谢!
【问题讨论】:
你在问什么?我对你的目标感到困惑。 我的目标是将表中的数据更快地加载到 DataFrame 中。我问是否可以从数据源的表中加载部分数据或任何其他方式,而不是读取整个表中所述问题。 【参考方案1】:虽然您无法更快地加载数据。您可以加载部分数据或提前终止。 Spark DataFrames 利用催化剂优化其底层查询计划,使其能够采取一些捷径。
例如调用limit
将允许Spark 跳过从底层数据源读取某些部分。这些将通过取消正在执行的任务来限制从 Cassandra 读取的数据量。
底层数据源可以使用调用过滤器或添加过滤器来帮助限制从 Cassandra 实际提取的信息量。可以下推的内容是有限制的,但这在文档中有详细说明。
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushing-down-clauses-to-cassandra
请注意,所有这一切都是通过在调用 DataSource 后对 DataSource 进行进一步的 api 调用来完成的。例如
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(table="kv", keyspace="ks")
.load()
df.show(10) // Will compute only enough tasks to get 10 records and no more
df.filter(clusteringKey > 5).show() //Will pass down the clustering predicate to C*
【讨论】:
所以第一条指令可能是从 Cassandra 加载数据的标准方式:“val df = sqlContext....load()”。所以即使在调用 load() 之后,响应时间仍然是即时的。然后我调用 df.count() ,这就是开始的惰性计算。我意识到另一件事,如果我应用一个过滤器来检索最近几天的条目(占总数的 30%),执行时间甚至比没有过滤器(where 条件)的时间更长。我的问题是我在一个表中有太多数据,我应该做的是尝试将数据拆分为几天或几周而不是几个月。 非常有用的解释...比文档更好以上是关于使用 Pyspark / Dataframes 时,如何将谓词下推到 Cassandra 或限制请求的数据?的主要内容,如果未能解决你的问题,请参考以下文章
PySpark:具有不同列的 DataFrames 的动态联合
如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引
Pyspark DataFrames 中的嵌套 SELECT 查询
使用 pyspark 在循环中附加 Spark DataFrames 的有效方法
PySpark DataFrames - 使用不同类型的列之间的比较进行过滤
在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe