Pyspark - df.cache().count() 永远运行

Posted

技术标签:

【中文标题】Pyspark - df.cache().count() 永远运行【英文标题】:Pyspark - df.cache().count() taking forever to run 【发布时间】:2021-05-10 20:11:55 【问题描述】:

我正在尝试使用我在网上阅读的计数方法强制对 PySpark 进行热切评估:

spark_df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)

spark_df.cache().count()

但是,当我尝试运行代码时,缓存计数部分需要永远运行。我的数据量相对较小(2.7GB,1500 万行),但在运行 28 分钟后,我决定终止这项工作。作为对比,当我使用 pandas.read_sql() 方法读取数据时,只用了 6 分 43 秒。

我运行代码的机器非常强大,(20 个 vCPU,160 GB RAM,Windows 操作系统)。我相信我错过了加快计数语句的步骤。

感谢任何帮助或建议。

【问题讨论】:

【参考方案1】:

当你使用 pandas 读取时,它会从机器的可用内存中使用尽可能多的内存(假设你提到的都是 160Gb,远远大于数据本身 ~3Gb)。

但是,Spark 就不一样了。当您启动 Spark 会话时,通常您必须预先提及您想要使用的每个执行程序(以及驱动程序和应用程序管理器,如果适用)有多少内存,以及如果您没有指定它,根据latest Spark documentation,它将是 1Gb。所以你要做的第一件事就是给你的执行者和驱动程序更多的内存。

其次,Spark 从 JDBC 读取是很棘手的,因为慢与否取决于执行器(和任务)的数量,而这些数字取决于您的 RDD(从 JDBC 连接读取)有多少分区,以及数量分区数取决于您的表、查询、列、条件等。强制改变行为、拥有更多分区、更多任务、更多执行程序的一种方法是通过以下配置:numPartitionspartitionColumnlowerBound,和 upperBound.

numPartitions 是分区的数量(因此将使用执行器的数量) partitionColumn 是一个整数类型的列,Spark 将使用它来定位分区 lowerBound 是您要读取的partitionColumn 的最小值 upperBound 是您要读取的partitionColumn 的最大值

你可以在这里https://***.com/a/41085557/3441510阅读更多,但基本的想法是,你想使用一个合理数量的执行者(由numPartitions定义),来处理一个均匀分布的每个执行器的数据块(由partitionColumnlowerBoundupperBound 定义)。

【讨论】:

非常有见地!我从 Spark 计划中看到了 numPartitions=1,我知道出了点问题……如果我定义了多个分区,是否相当于向数据库发送并行提示进行处理?或者 numPartitions 和并行提示的概念是否相互独立?我发现将批量大小设置为更高的数字也很有帮助。 如果您阅读上面的链接,您会看到您的主要查询被分解为许多较小的查询(其中多少取决于您的配置),然后每个查询将被发送到数据库平行线。平衡上述所有四个参数的主要目标是增加工作人员的数量并确保每个工作人员处理相当等量的数据。

以上是关于Pyspark - df.cache().count() 永远运行的主要内容,如果未能解决你的问题,请参考以下文章

重用pyspark缓存并在for循环中不持久

python + pyspark:在pyspark中进行多列比较的内部连接错误

从缓存中删除 spark 数据帧

Grafana 使用 http_server_requests_seconds_count 绘制每分钟的 HTTP 请求数

JavaScript+css实现转向灯双闪animationanimation-iteration-countkeyframesinfinite循环动画动画执行无限次

使用 cxf-spring-boot-starter-jaxrs 的 Spring Boot 应用程序中的指标 http_server_requests_seconds_count 包含 uri 作为