在 spark 数据框中使用 where 子句加载数据

Posted

技术标签:

【中文标题】在 spark 数据框中使用 where 子句加载数据【英文标题】:Load data with where clause in spark dataframe 【发布时间】:2020-04-23 12:07:42 【问题描述】:

我有一个包含 n 条记录的 oracle 表,现在我想使用 where/filter 条件从该表加载数据以触发数据帧。我不想将完整的数据加载到数据框中,然后对其应用过滤器。 spark.read.format("jdbc")...etc 或任何其他解决方案中是否有任何选项?

【问题讨论】:

***.com/questions/46984914/… 这能回答你的问题吗? spark, scala & jdbc - how to limit number of records 【参考方案1】:

检查下面的代码。您可以在查询变量中编写自己的查询。要并行处理或加载数据,您可以检查 partitionColumn、lowerBound 和 upperBound 列。

val query = """
  (select columnA,columnB from table_name
    where <where conditions>) table
"""  
val options = Map(
    "url"              -> "<url>".
    "driver"           -> "<driver class>".
    "user"             -> "<user>".
    "password"         -> "<password>".
    "dbtable"          -> query,
    "partitionColumn"  -> "",
    "lowerBound"       -> "<lower bound values>", 
    "upperBound"       -> "<upper bound values>"
)

val df = spark
        .read
        .format("jdbc")
        .options(options)
        .load()

【讨论】:

我认为SQL应该有(select in parenthesis) AS table【参考方案2】:

试试这个

val sourceDf = spark.read.format("jdbc").option("driver", driver).option("url", url).option("dbtable", "(select * from dbo.employee c where c.joindate  > '2018-11-19 00:00:00.000') as subq").option("numPartitions", 6).option("partitionColumn", "depId").option("lowerBound", 1).option("upperBound", 100).option("user", user).option("password", pass).load()

它将启用 where 条件以及分区

【讨论】:

【参考方案3】:

Spark 确实支持 JDBC 源的谓词下推。

您可以简单地使用spark.read.format("jdbc") 加载数据帧并在该df 之上使用.where() 运行过滤器,然后您可以检查是否应用了spark SQL 谓词下推。

在 SparkSQL 中,您可以看到针对 db 运行的确切查询,并且您会发现添加了 WHERE 子句。

所以你不需要为它添加任何额外的东西。

更多详情请参考databrickshttps://docs.databricks.com/data/data-sources/sql-databases.html#push-down-optimization这篇文章

【讨论】:

【参考方案4】:

您可以为此用例使用以下选项。参考link

    jdbcUrl = "jdbc:sqlserver://0:1;database=2".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = 
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"

根据查询条件创建数据框:

pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

【讨论】:

以上是关于在 spark 数据框中使用 where 子句加载数据的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark 执行“WHERE IN”子句,如何仅重新训练第一个数据集的列?

Spark Cassandra 连接器 - where 子句

哪个更快? Spark SQL with Where 子句或在 Spark SQL 之后在 Dataframe 中使用过滤器

如果 where 子句已经修复,如何加快 spark sql 过滤器查询?

为啥我的 WHERE 子句在我的组合框中不起作用

使用 Spark 读取带有 where 子句的 HBase 表