Pyspark:依靠 pyspark.sql.dataframe.DataFrame 需要很长时间

Posted

技术标签:

【中文标题】Pyspark:依靠 pyspark.sql.dataframe.DataFrame 需要很长时间【英文标题】:Pyspark: count on pyspark.sql.dataframe.DataFrame takes long time 【发布时间】:2020-01-31 16:56:33 【问题描述】:

我有一个pyspark.sql.dataframe.DataFrame,如下所示

df.show()
+--------------------+----+----+---------+----------+---------+----------+---------+
|                  ID|Code|bool|      lat|       lon|       v1|        v2|       v3|
+--------------------+----+----+---------+----------+---------+----------+---------+
|5ac52674ffff34c98...|IDFA|   1|42.377167| -71.06994|17.422535|1525319638|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37747|-71.069824|17.683573|1525319639|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37757| -71.06942|22.287935|1525319640|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37761| -71.06943|19.110023|1525319641|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.377243| -71.06952|18.904774|1525319642|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378254| -71.06948|20.772903|1525319643|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37801| -71.06983|18.084948|1525319644|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378693| -71.07033| 15.64326|1525319645|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378723|-71.070335|21.093477|1525319646|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37868| -71.07034|21.851894|1525319647|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378716| -71.07029|20.583202|1525319648|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37872| -71.07067|19.738768|1525319649|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.379112| -71.07097|20.480911|1525319650|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37952|  -71.0708|20.526752|1525319651| 44.93808|
|5ac52674ffff34c98...|IDFA|   1| 42.37902| -71.07056|20.534052|1525319652| 44.93808|
|5ac52674ffff34c98...|IDFA|   1|42.380203|  -71.0709|19.921381|1525319653| 44.93808|
|5ac52674ffff34c98...|IDFA|   1| 42.37968|-71.071144| 20.12599|1525319654| 44.93808|
|5ac52674ffff34c98...|IDFA|   1|42.379696| -71.07114|18.760069|1525319655| 36.77853|
|5ac52674ffff34c98...|IDFA|   1| 42.38011| -71.07123|19.155525|1525319656| 36.77853|
|5ac52674ffff34c98...|IDFA|   1| 42.38022|  -71.0712|16.978994|1525319657| 36.77853|
+--------------------+----+----+---------+----------+---------+----------+---------+
only showing top 20 rows

如果尝试count

%%time
df.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 28.1 s

30241272

现在,如果我采用 df 的一个子集,计算的时间会更长。

id0 = df.first().ID  ## First ID
tmp = df.filter( (df['ID'] == id0) )

%%time
tmp.count()

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 1min 33s
Out[6]:
3299

【问题讨论】:

【参考方案1】:

你的问题很吸引人,也很棘手..

为了重现您的行为,我使用大型数据集进行了测试。

问题描述

我在一个大型数据集中测试了以下两种情况:

# Case 1
df.count() # Execution time: 37secs

# Case 2
df.filter((df['ID'] == id0)).count() #Execution time: 1.39 min

说明

让我们看看只有.count()的物理计划:

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#38L])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#41L])
      +- *(1) FileScan csv [] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

让我们看看物理计划.filter()然后.count()

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#61L])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#64L])
      +- *(1) Project
         +- *(1) Filter (isnotnull(ID#11) && (ID#11 = Muhammed MacIntyre))
            +- *(1) FileScan csv [ID#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [IsNotNull(ID), EqualTo(ID,Muhammed MacIntyre)], ReadSchema: struct<_c1:string>

一般而言,Spark 在计算行数时会映射 count=1 的行,并减少所有映射器以创建最终的行数。

案例 2 中,Spark 必须首先过滤,然后为每个分区创建部分计数,然后再有另一个阶段将它们汇总在一起。因此,对于相同的行,在第二种情况下,Spark 也会进行过滤,这会影响大型数据集中的计算时间。 Spark 是一个分布式处理框架,没有 Pandas 之类的索引,它可以非常快速地进行过滤而无需传递所有行。

总结

在这种简单的情况下,您无法做很多事情来缩短执行时间。 您可以使用不同的配置设置尝试您的应用程序(例如 #spark.sql.shuffle.partitions、# spark.default.parallelism# of executors# executor memory 等)

【讨论】:

【参考方案2】:

这是因为 spark 是 lazily evaluated。当您调用 tmp.count() 时,这是一个操作步骤。换句话说,您的 tmp.count 时间还包括过滤时间。如果您想真正比较这两个计数,请尝试以下操作:

%%time
df.count()

id0 = df.first().ID  ## First ID
tmp = df.filter( (df['ID'] == id0) )
tmp.persist().show()

%%time
tmp.count()

这里的重要组件是执行计数的 tmp.persist().show() BEFORE。这将执行过滤器并缓存结果。这样,tmp.count() 只包含实际的计数时间。

【讨论】:

情况越来越糟。 您可以在更改后发布输出吗? persist(或缓存)方法返回一个新的 DF.. 使用 tmp = tmp.persist() 然后 tmp.count() 代替。 @blackbishop 在 Python 中,不需要在新变量中赋值,可以使用相同的tmp.persist().count(),性能是一样的。 @blackbishop 实际上确实改变了计数的性能。持久化数据帧无需执行过滤器和计数,只需执行计数即可。

以上是关于Pyspark:依靠 pyspark.sql.dataframe.DataFrame 需要很长时间的主要内容,如果未能解决你的问题,请参考以下文章

pyspark.sql.functions.col 和 pyspark.sql.functions.lit 之间的 PySpark 区别

Pyspark 安装错误:没有名为“pyspark”的模块

Pyspark:将 sql 查询转换为 pyspark?

Pyspark - ImportError:无法从“pyspark”导入名称“SparkContext”

Pyspark:基于所有列减去/差异 pyspark 数据帧

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe