spark jdbc df limit ...它在做啥?

Posted

技术标签:

【中文标题】spark jdbc df limit ...它在做啥?【英文标题】:spark jdbc df limit... what is it doing?spark jdbc df limit ...它在做什么? 【发布时间】:2016-10-03 16:32:01 【问题描述】:

我正在尝试学习如何了解 Spark 内部正在发生的事情,这是我目前的困惑。我正在尝试将 Oracle 表中的前 200 行读入 Spark:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "schema.table",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

jdbcDF.limit(200).count()

我希望这会相当快。对具有 500K 行的表的类似操作在合理的时间内完成。在这种特殊情况下,表要大得多(数亿行),但我认为 limit(200) 会使其更快吗?我该如何弄清楚它把时间花在了哪里?

【问题讨论】:

【参考方案1】:

事实上,spark 还不能下推limit 谓词。

所以实际上在这种情况下发生的情况是它正在拉动所有数据以激发火花,然后进行限制和计数。您需要在子查询中将其用作表参数。

例如:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> "(select * from schema.table limit 200) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

因此,主要花费时间的地方是提取所有数据以激发火花。

也可以在子查询中动态传递限制:

val n : Int = ???

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" -> "jdbc:oracle:thin:...",
  "dbtable" -> s"(select * from schema.table limit $n) as t",
  "fetchSize" -> "5000",
  "partitionColumn" -> "my_row_id",
  "numPartitions" -> "16",
  "lowerBound" -> "0",
  "upperBound" -> "9999999"
  )).load()

有一个JIRA ticket (SPARK-10899)正在进行来解决这个问题,但它已经挂了将近一年了。

编辑:因为上述 JIRA 中的问题被标记为重复。您可以继续跟踪问题here - SPARK-12126。 我希望这能回答你的问题。

【讨论】:

谢谢,它回答了大部分内容(这很有意义...... Spark 应该如何知道如何限制特定 RDBMS 中的结果?)。 说实话,我没有深入研究这个主题,但我知道它必须在数据源催化剂中实现。但是 Catalyst API 仍然很神秘,并且没有太多关于它的文档。所以恐怕我无法回答关于谓词应该如何具体下推的问题。 截至 2017 年 11 月,我可以确认 Spark 2.2.0 现在能够将 limit 谓词 下推到 @​​987654328@ 我通过在SQL query(字符串)本身中包含limit clause 来实现这一点。不确定是否按此处提到的方式工作。 这不是同一件事@y2k-shubham :) 我们正在谈论您在 spark 方面定义的谓词。

以上是关于spark jdbc df limit ...它在做啥?的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 中使用 jdbc 驱动程序连接到 Hive

通过 JDBC 从 Spark 提取表数据时出现 PostgreSQL 错误

Java spark 无法执行 df.show()

Pyspark - df.cache().count() 永远运行

Spark上的Hive如何从jdbc读取数据?

如何将 Spark 数据帧写入 impala 数据库