从 Spark 中的 DataFrame 中过滤和选择数据

Posted

技术标签:

【中文标题】从 Spark 中的 DataFrame 中过滤和选择数据【英文标题】:Filtering and selecting data from a DataFrame in Spark 【发布时间】:2018-07-24 09:31:13 【问题描述】:

我正在开发一个 Spark-JDBC 程序 到目前为止,我想出了以下代码:

object PartitionRetrieval 
    var conf  = new SparkConf().setAppName("Spark-JDBC")
    val log   = LogManager.getLogger("Spark-JDBC Program")
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conFile       = "/home/hmusr/ReconTest/inputdir/testconnection.properties"
    val properties    = new Properties()
    properties.load(new FileInputStream(conFile))
    val connectionUrl = properties.getProperty("gpDevUrl")
    val devUserName   = properties.getProperty("devUserName")
    val devPassword   = properties.getProperty("devPassword")
    val driverClass   = properties.getProperty("gpDriverClass")
    val tableName     = "source.bank_accounts"
    try 
    Class.forName(driverClass).newInstance()
     catch 
    case cnf: ClassNotFoundException =>
        log.error("Driver class: " + driverClass + " not found")
        System.exit(1)
    case e: Exception =>
        log.error("Exception: " + e.printStackTrace())
        System.exit(1)
    
    def main(args: Array[String]): Unit = 
        val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
        val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
                                                        .option("dbtable",tableName)
                                                        .option("user",devUserName)
                                                        .option("password",devPassword).load()
        val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE").count()
        println("gpTable Count: " + rc)
    

在上面的代码中,语句:val gpTable = spark.read.format("jdbc").option("url", connectionUrl)是否会将table: bank_accounts的全部数据转储到DataFrame: gpTable,然后DataFrame: rc得到过滤后的数据。我对表有这个疑问:bank_accounts 是一个非常小的表,如果将它作为一个整体数据帧加载到内存中,它不会产生影响。但是在我们的生产中,有数十亿条记录的表。在这种情况下,使用 JDBC 连接将数据加载到 DataFrame 的推荐方法是什么? 谁能在这里告诉我 Spark-Jdbc 的入口点的概念?

【问题讨论】:

【参考方案1】:

语句会不会...把table:bank_accounts的全部数据dump到DataFrame:gpTable中,然后DataFrame:rc得到过滤后的数据。

没有。 DataFrameReader 不急。它只定义数据绑定。

此外,简单的谓词,如平凡的相等,检查被推送到源,并且在执行计划时只应加载所需的列。

在数据库日志中,您应该会看到类似于

的查询
 SELECT 1 FROM table WHERE source_system_name = 'ORACLE'

如果它作为一个整体数据帧加载到内存中。

没有。 Spark 不会将数据加载到内存中,除非它被指示(主要是 cache),即使这样,它也会将自己限制在适合可用存储内存的块中。

在标准过程中,它只保留计算计划所需的数据。对于全局计划内存占用不应依赖于数据量。

在这种情况下,使用 JDBC 连接将数据加载到 DataFrame 的推荐方法是什么?

有关可扩展性的问题,请查看Partitioning in spark while reading from RDBMS via JDBC、Whats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?、https://***.com/a/45028675/8371915。

另外你可以阅读Does spark predicate pushdown work with JDBC?

【讨论】:

以上是关于从 Spark 中的 DataFrame 中过滤和选择数据的主要内容,如果未能解决你的问题,请参考以下文章

在新列上过滤 Spark DataFrame

哪个更快? Spark SQL with Where 子句或在 Spark SQL 之后在 Dataframe 中使用过滤器

Spark SQL Dataframe API - 动态构建过滤条件

使用 Spark DataFrame 的地理过滤器

Spark Hive:通过另一个 DataFrame 的列的值过滤一个 DataFrame 的行

如何从 Spark 2.0 中的 DataFrame 列创建数据集?