执行 pyspark.sql.DataFrame.take(4) 超过一小时

Posted

技术标签:

【中文标题】执行 pyspark.sql.DataFrame.take(4) 超过一小时【英文标题】:More than one hour to execute pyspark.sql.DataFrame.take(4) 【发布时间】:2016-03-08 14:23:09 【问题描述】:

我在 3 个虚拟机(即 1 个主设备;2 个从设备)上运行 spark 1.6,所有虚拟机都有 4 个内核和 16GB RAM。

我可以看到在 spark-master webUI 上注册的工人。

我想从我的 Vertica 数据库中检索数据来处理它。由于我没有设法运行复杂的查询,我尝试了虚拟查询来理解。我们认为这是一项简单的任务。

我的代码是:

df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)

输出是(注意:我替换为@IPSLAVE从属VM IP:端口):

16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s

如您所见,这需要很长时间。 我的表实际上很大(存储大约 2.2 亿行,每个 11 个字段),但这样的查询将使用“普通”sql(例如 pyodbc)立即执行。

我想我误解/误用了 Spark,你有什么想法或建议可以让它更好地工作吗?

【问题讨论】:

【参考方案1】:

虽然 Spark 支持通过 JDBC 进行有限谓词下推,但所有其他操作(如限制、组、聚合)都是在内部执行的。不幸的是,这意味着take(4) 将首先获取数据,然后再应用limit。换句话说,您的数据库将执行(假设没有投影和过滤器)相当于:

SELECT * FROM table 

其余的将由 Spark 处理。涉及到一些优化(特别是 Spark evaluates partitions iteratively 以获取 LIMIT 请求的记录数),但与数据库端优化相比,它仍然非常低效。

如果您想将limit 推送到数据库,您必须使用子查询作为dbtable 参数静态地执行此操作:

(sqlContext.read.format('jdbc')
    .options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))
sqlContext.read.format("jdbc").options(Map(
  "url"     -> "xxxx",
  "dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))

请注意,子查询中的别名是强制性的。

注意

一旦数据源 API v2 准备就绪,这种行为可能会在未来得到改进:

SPARK-15689 SPIP: Data Source API V2

【讨论】:

有趣。如果一切正常,我会处理它并返回给您验证 anwser :) 谢谢! @zero323 tmp 之后的字符是否应该改为 '? Scala 示例适用于 sqlContext.read.jdbc("jdbc:sqlserver://example.com;databaseName=local;user=debug;password=debug", "(SELECT TOP 5 * FROM ExampleTable) tmp", new java.util.Properties) @RăzvanPanda 是的,应该。已修复,感谢 Options 在 Scala 中也能工作。 @zero323 似乎使用过滤器也没有转换为数据库特定的 SQL :( @RăzvanPanda 一切都取决于过滤器的类型。参见例如***.com/a/32585936/1560062 这是你的意思吗? 仅限于逻辑连词 - 特别是。

以上是关于执行 pyspark.sql.DataFrame.take(4) 超过一小时的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换

pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换

将pyspark列连接到pyspark DataFrame

通过 pyspark.sql.dataframe 将 XML 数据转换为 pandas 数据帧

如何以 xml 格式保存 pyspark sql DataFrame