如何在 Spark 中并行读取数据库中的数据
Posted
技术标签:
【中文标题】如何在 Spark 中并行读取数据库中的数据【英文标题】:How to Read Data from DB in Spark in parallel 【发布时间】:2016-08-11 20:44:16 【问题描述】:我需要使用 Spark SQL 从 DB2 数据库中读取数据(因为 Sqoop 不存在)
我知道这个函数会通过打开多个连接来并行读取数据
jdbc(url: String, table: String, columnName: String, lowerBound: Long,upperBound: Long, numPartitions: Int, connectionProperties: Properties)
我的问题是我没有这样的增量列。此外,我只需要通过 Query 读取数据,因为我的表很大。有谁知道通过 API 读取数据的方法,或者我必须自己创建一些东西
【问题讨论】:
“增量列”是什么意思?您只需为 Spark 提供服务器的 JDBC 地址 参考这里。这些示例不使用列或绑定参数。 spark.apache.org/docs/latest/… 没错。问题是我不会有两个以上的刽子手。这意味着 2 的并行性。如果我在测试中添加这些变量(字符串,下界:长,上界:长,numPartitions)一个执行者正在创建 10 个分区。但是,如果我不给这些分区,则只会发生两个 pareele 读取。我们还有其他方法可以做到这一点吗? 我不确定。我对 Spark 的 JDBC 选项不太熟悉 【参考方案1】:Saurabh,为了使用标准 Spark JDBC 数据源支持并行读取,您确实需要按照您的设想使用 numPartitions 选项。
但是你需要给 Spark 一些线索,如何将读取的 SQL 语句拆分为多个并行的语句。因此,您需要某种整数分区列,其中有明确的最大值和最小值。
如果您的 DB2 系统是 MPP 分区的,则存在一个隐式分区,您实际上可以利用这一事实并并行读取每个 DB2 数据库分区:
var df = spark.read.
format("jdbc").
option("url", "jdbc:db2://<DB2 server>:<DB2 port>/<dbname>").
option("user", "<username>").
option("password", "<password>").
option("dbtable", "<your table>").
option("partitionColumn", "DBPARTITIONNUM(<a column name>)").
option("lowerBound", "<lowest partition number>").
option("upperBound", "<largest partition number>").
option("numPartitions", "<number of partitions>").
load()
所以你可以看到 DBPARTITIONNUM() 函数是这里的分区键。
以防万一您不知道 DB2 MPP 系统的分区,您可以使用 SQL 找出它:
SELECT min(member_number), max(member_number), count(member_number)
FROM TABLE(SYSPROC.DB_MEMBERS())
如果您使用多个分区组并且不同的表可能分布在不同的分区集上,您可以使用此 SQL 来计算每个表的分区列表:
SELECT t2.DBPARTITIONNUM, t3.HOST_NAME
FROM SYSCAT.TABLESPACES as t1, SYSCAT.DBPARTITIONGROUPDEF as t2,
SYSCAT.TABLES t4, TABLE(SYSPROC.DB_MEMBERS()) as t3
WHERE t1.TBSPACEID = t4.TBSPACEID AND
t4.TABSCHEMA='<myschema>' AND
t4.TABNAME='<mytab>' AND
t1.DBPGNAME = t2.DBPGNAME AND
t2.DBPARTITIONNUM = t3.PARTITION_NUMBER;
【讨论】:
如果您的 DB2 系统是 dashDB(功能齐全的 DB2 的简化形式,可作为托管服务在云中使用,或作为本地 docker 容器部署),那么您可以从内置的在 Spark 环境中,自动为您提供 MPP 部署中的分区数据帧。然后,您需要做的就是使用特殊数据源 spark.read.format("com.ibm.idax.spark.idaxsource")... 也可以在这里查看演示笔记本:github.com/ibmdbanalytics/dashdb_analytic_tools/blob/master/… 托斯滕,这个问题比这更复杂。我知道你在这里暗示什么,但我的用例更加细微。例如,我有一个查询正在读取 50,000 条记录。它在索引上的分区上有子集,假设 A.A 列的范围是 1-100 和 10000-60100,表有四个分区。 Ans 以上将读取 2-3 个分区中的数据,其中一个分区有 100 rcd(0-100),其他分区基于表结构。这将导致数据读取最多 5 个连接。我通过扩展 Df 类和创建分区方案来做到这一点,这给了我更多的连接和读取速度。 Sarabh,我的建议适用于您拥有 MPP 分区 DB2 系统的情况。在这种情况下,不要试图通过现有的列来实现并行读取,而是并行读取现有的散列分区数据块。不确定你是否有 MPP 强硬。我不确定我是否理解您所指的表的四个“分区”?这些值的逻辑范围是否在您的 A.A 列中? 您好 Torsten,我们的数据库仅是 MPP。我们在表中有四个分区(就像我们有四个 DB2 实例节点一样)。 @TorstenSteinbach 是否可以在 dashDB 本地安装之外找到包含 com.ibm.idax.spark.idaxsource 的 jar 文件?我相信this github 项目在这里可能有一些相关性,但我认为它已经失去支持一段时间了。【参考方案2】:您不需要标识列并行读取,table
变量仅指定源。注册表后,您可以使用 Spark SQL 查询使用WHERE
子句限制从中读取的数据。如果这不是一个选项,您可以改用视图,或者如 post 中所述,您也可以使用任意子查询作为表输入。
val dataframe = sqlContext.read.format("jdbc").option("url", "jdbc:db2://localhost/sparksql").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "table").option("user", "root").option("password", "root").load()
dataframe.registerTempTable("table")
dataframe.sqlContext.sql("select * from table where dummy_flag=1").collect.foreach(println)
【讨论】:
按照***.com/questions/37468418/…,数据将被完全读取。限制数据在读取数据时不起作用 请您确认确实如此吗?根据 zero323 评论和***.com/questions/32573991/…,假设您在查询中使用where
谓词,在这种情况下应该有谓词推送。如果 DB2 不是这种情况,请告诉我,在这种情况下,我将更改我的答案以使用视图,或者您可以发布自己的答案,我将删除我的答案。【参考方案3】:
当您没有某种标识列时,最好的选择是使用所述的“谓词”选项(
https://spark.apache.org/docs/2.2.1/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,predicates:Array[String],connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame
每个谓词都应该仅使用索引列构建,并且您应该尝试确保它们均匀分布。 Spark 将为您提供的每个谓词创建一个任务,并根据可用的内核并行执行尽可能多的任务。
我见过的典型方法将使用哈希函数将唯一的字符串列转换为 int,希望您的数据库支持(可能类似于 https://www.ibm.com/support/knowledgecenter/en/SSEPGG_9.7.0/com.ibm.db2.luw.sql.rtn.doc/doc/r0055167.html)。然后你可以把它分成像
这样的桶mod(abs(yourhashfunction(yourstringid)),numOfBuckets) + 1 = bucketNumber
如果你有复合唯一性,你可以在散列之前连接它们。
您还可以通过附加命中其他索引或分区的条件来改进您的谓词(即 AND partitiondate = somemeaningfuldate)。
最后应该指出的是,这通常不如标识列好,因为它可能需要对目标索引进行全面或更广泛的扫描 - 但它仍然远远优于不做其他任何事情。
【讨论】:
以上是关于如何在 Spark 中并行读取数据库中的数据的主要内容,如果未能解决你的问题,请参考以下文章
Spark 读取 Hbase 优化 --手动划分 region 提升并行数