从 Spark 中的 DataFrame 中过滤和选择数据
Posted
技术标签:
【中文标题】从 Spark 中的 DataFrame 中过滤和选择数据【英文标题】:Filtering and selecting data from a DataFrame in Spark 【发布时间】:2018-07-24 09:31:13 【问题描述】:我正在开发一个 Spark-JDBC 程序 到目前为止,我想出了以下代码:
object PartitionRetrieval
var conf = new SparkConf().setAppName("Spark-JDBC")
val log = LogManager.getLogger("Spark-JDBC Program")
Logger.getLogger("org").setLevel(Level.ERROR)
val conFile = "/home/hmusr/ReconTest/inputdir/testconnection.properties"
val properties = new Properties()
properties.load(new FileInputStream(conFile))
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName = properties.getProperty("devUserName")
val devPassword = properties.getProperty("devPassword")
val driverClass = properties.getProperty("gpDriverClass")
val tableName = "source.bank_accounts"
try
Class.forName(driverClass).newInstance()
catch
case cnf: ClassNotFoundException =>
log.error("Driver class: " + driverClass + " not found")
System.exit(1)
case e: Exception =>
log.error("Exception: " + e.printStackTrace())
System.exit(1)
def main(args: Array[String]): Unit =
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
.option("dbtable",tableName)
.option("user",devUserName)
.option("password",devPassword).load()
val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE").count()
println("gpTable Count: " + rc)
在上面的代码中,语句:val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
是否会将table: bank_accounts
的全部数据转储到DataFrame: gpTable
,然后DataFrame: rc
得到过滤后的数据。我对表有这个疑问:bank_accounts 是一个非常小的表,如果将它作为一个整体数据帧加载到内存中,它不会产生影响。但是在我们的生产中,有数十亿条记录的表。在这种情况下,使用 JDBC 连接将数据加载到 DataFrame 的推荐方法是什么?
谁能在这里告诉我 Spark-Jdbc 的入口点的概念?
【问题讨论】:
【参考方案1】:语句会不会...把table:bank_accounts的全部数据dump到DataFrame:gpTable中,然后DataFrame:rc得到过滤后的数据。
没有。 DataFrameReader
不急。它只定义数据绑定。
此外,简单的谓词,如平凡的相等,检查被推送到源,并且在执行计划时只应加载所需的列。
在数据库日志中,您应该会看到类似于
的查询 SELECT 1 FROM table WHERE source_system_name = 'ORACLE'
如果它作为一个整体数据帧加载到内存中。
没有。 Spark 不会将数据加载到内存中,除非它被指示(主要是 cache
),即使这样,它也会将自己限制在适合可用存储内存的块中。
在标准过程中,它只保留计算计划所需的数据。对于全局计划内存占用不应依赖于数据量。
在这种情况下,使用 JDBC 连接将数据加载到 DataFrame 的推荐方法是什么?
有关可扩展性的问题,请查看Partitioning in spark while reading from RDBMS via JDBC、Whats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?、https://***.com/a/45028675/8371915。
另外你可以阅读Does spark predicate pushdown work with JDBC?
【讨论】:
以上是关于从 Spark 中的 DataFrame 中过滤和选择数据的主要内容,如果未能解决你的问题,请参考以下文章
哪个更快? Spark SQL with Where 子句或在 Spark SQL 之后在 Dataframe 中使用过滤器
Spark SQL Dataframe API - 动态构建过滤条件