Apache Spark 如何在内存中工作?

Posted

技术标签:

【中文标题】Apache Spark 如何在内存中工作?【英文标题】:How does Apache Spark works in memory? 【发布时间】:2018-09-10 17:19:13 【问题描述】:

在 where 子句中使用非索引列查询 Cassandra 时,Spark-Cassandra-Connector 的 official documentation 说,

要过滤行,您可以使用 Spark 提供的过滤器转换。但是,这种方法会导致从 Cassandra 获取所有行,然后由 Spark 过滤。

我对此有点困惑。例如,如果我有十亿行这个 db 结构:ID、City、State 和 Country,其中只有 ID 被索引。如果我在 where 子句中使用 City = 'Chicago',Spark 会首先下载所有十亿行,然后过滤掉 City = 'Chicago' 的行吗?或者它会从 Cassandra 读取一些数据块,运行过滤器,存储符合条件的行,然后获取更多数据块,获取符合条件的行,然后再次将它们放在一边……然后继续这个过程。如果在任何时候,RAM 和/或磁盘存储空间不足,删除/卸载/摆脱不符合条件的数据,并获取新的数据块以继续该过程?

另外,谁能告诉我一个通用公式来计算保存一个 bigdecimal 列和 3 个十亿行的文本列需要多少磁盘空间?

【问题讨论】:

【参考方案1】:

过滤行可以在数据库或 Spark 中进行。文档建议的是尽可能尝试过滤数据库中的记录,而不是在 spark 中进行。这意味着什么:

sc.cassandraTable("test", "cars")
  .select("id", "model")
  .where("color = ?", "black")

上述语句将在 Cassandra 数据库中运行color = 'black' 过滤器,因此 Spark 不会将除黑色以外的任何颜色的记录提取到其内存中。 Spark 可能不会将十亿条记录拉入内存,而是仅加载几百万条恰好在 color 列中具有黑色值的记录。

相比之下,spark中可以进行过滤:

sc.cassandraTable("test", "cars")
  .select("id", "model")
  .filter(car -> "black".equals(car.getColor()))

最后一个版本会将所有数十亿条记录加载到 Spark 的内存中,然后在 在 Spark 中按颜色过滤它们。显然,这不能比以前的版本更可取,后者最大限度地减少了 Spark 集群所需的内存量。所以对于任何可以在数据库中处理的简单过滤,都应该使用数据库/驱动/查询过滤器。

关于估计内存需求,还有其他问题提出了各种方法,请查看this和this。 spark's documentation也有很好的建议:

您需要多少内存取决于您的应用程序。要确定您的应用程序使用了多少数据集大小,请将数据集的一部分加载到 Spark RDD 中,并使用 Spark 监控 UI (http://:4040) 的“存储”选项卡查看其在内存中的大小。请注意,内存使用受存储级别和序列化格式的影响很大 - 请参阅调优指南了解如何减少内存使用的技巧。

【讨论】:

在 test.cars 示例中,我假设 color 已编入索引。否则不会抛出ALLOW FILTERING 错误吗? @FarazDurrani 没错。 Cassandra 的主键/分区键相关的查询规则仍然适用。我认为按该字段过滤是合法的。 如果你想这样做并且不能在表字段上创建二级索引,那么你只能使用spark的过滤器(rdd或数据框过滤)。 我认为即使我在 where 子句中使用非索引列,它仍然会在 DB (Cassandra) 级别发生。而我记忆中的只有那些符合条件的行。 好的。让我们明确两件事。首先,只有在您使用.where(cql predicate)(而不是.filter)时才会在数据库中进行过滤。其次,您对allow filtering 的看法是正确的(即使连接器仍可能遇到数据库错误 - 文档说 Cassandra 引擎并不允许所有谓词)。最后,由数据所有者决定运行查询的有效方式是什么,以及可以调整什么(索引,或者更确切地说是在 spark 中运行过滤器)...【参考方案2】:

spark cassandra 连接器将发出具有特定令牌范围的多个查询(每个 spark 任务 1 个)。所以总的来说,这将是一个全表扫描,但它会一次完成一个,并且是并行的。如果您在每个 cassandra 节点上运行 spark worker,则连接器将选择令牌范围以匹配本地 cassandra 节点。这将限制跨网络的数据洗牌。但是会发生全表扫描,这并不理想。

【讨论】:

即使是全表扫描,我的 RAM 中最终会是什么?符合条件的行对吗?

以上是关于Apache Spark 如何在内存中工作?的主要内容,如果未能解决你的问题,请参考以下文章

“Exchange hashpartitioning”如何在 spark 中工作

为啥朴素贝叶斯不能像逻辑回归一样在 Spark MLlib 管道中工作?

为什么kryo注册不能在SparkSession中工作?

Python脚本会使用pyspark在spark中工作吗

sc.parallelize 不能在训练算法的 ML 管道中工作

如何设置 Python 脚本以在 Apache 2.0 中工作?