Spark JDBC伪列不起作用
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark JDBC伪列不起作用相关的知识,希望对你有一定的参考价值。
对于我的用例,我试图使用spark JDBC读取一个大的oracle表。因为,我的表中没有整数类型列,我使用rownum
作为paritionColumn
。
这是我的spark查询的样子:(对于测试我使用的表只有22000行。)
val df = spark.read.jdbc(jdbcUrl = url, table = select * from table1,
columnName= "rownum", lowerBound = 0, upperBound = 22000,
numPartitions = 3, connectionProperties = oracleProperties)
理想情况下,它应该返回3个分区,每个分区有近7000行。但是当我在数据帧的每个分区上运行计数时,我可以看到只有一个分区有行,而其他分区有0。
df.rdd.mapPartitionsWithIndex{case(i, rows) => Iterator((i, rows.size))}.toDF().show()
输出:
+---+----+
| _1| _2 |
+---+----+
| 0 |7332|
| 1 | 0 |
| 2 | 0 |
+---+----+
你能否建议为什么它只在一个分区中返回行?
我的来源是Oracle数据库。使用oracle jdbc驱动程序oracle.jdbc.driver.OracleDriver
jar - > ojdbc7.jar
答案
经过一些谷歌搜索后,我能够使它工作。我做了一些
我试图使用Spark jdbc从Oracle数据库中读取。我能够使用oracle的Pseudocolumn ROWNUM将来自spark的读取与一些hacks并行化。诀窍是你必须为ROWNUM列添加别名,然后使用该别名列。
我想查询整个“table1”并在该表的spark中创建多个分区。
val df = spark.read.jdbc(jdbcUrl= url, table = "select * from table1",
columnName="ROWNUM", lowerBound = 0, upperBound = 22000,
numPartitions = 3, connectionProperties = oracleProperties)
为了在Pseudocolumn上进行分区,修改查询如下:
val df = spark.read.jdbc(jdbcUrl= url, table = "(select t1.*, ROWNUM as num_rows from (select * from table1) t1) oracle_table1",
columnName="num_rows", lowerBound = 0, upperBound = 22000,
numPartitions = 3, connectionProperties = oracleProperties)
这样我们实际上将伪列作为实际列并将其用于分区。
以上是关于Spark JDBC伪列不起作用的主要内容,如果未能解决你的问题,请参考以下文章